Revert "Dynamic erasure (#4653)"

This reverts commit ada4d16c4c.
This commit is contained in:
Michael Vines 2019-06-20 20:15:33 -07:00
parent aa0f8538ed
commit 36c9e22e3d
14 changed files with 1064 additions and 1118 deletions

File diff suppressed because it is too large Load Diff

View File

@ -39,10 +39,6 @@ pub mod columns {
#[derive(Debug)] #[derive(Debug)]
/// The root column /// The root column
pub struct Root; pub struct Root;
#[derive(Debug)]
/// The index column
pub struct Index;
} }
pub trait Backend: Sized + Send + Sync { pub trait Backend: Sized + Send + Sync {

View File

@ -71,21 +71,14 @@ impl Backend for Kvs {
impl Column<Kvs> for cf::Coding { impl Column<Kvs> for cf::Coding {
const NAME: &'static str = super::ERASURE_CF; const NAME: &'static str = super::ERASURE_CF;
type Index = (u64, u64, u64); type Index = (u64, u64);
fn key((slot, set_index, index): (u64, u64, u64)) -> Vec<u8> { fn key(index: (u64, u64)) -> Key {
let mut key = Key::default(); cf::Data::key(index)
BigEndian::write_u64(&mut key.0[..8], slot);
BigEndian::write_u64(&mut key.0[8..16], set_index);
BigEndian::write_u64(&mut key.0[16..], index);
key
} }
fn index(key: &Key) -> (u64, u64, u64) { fn index(key: &Key) -> (u64, u64) {
let slot = BigEndian::read_u64(&key.0[..8]); cf::Data::index(key)
let set_index = BigEndian::read_u64(&key.0[8..16]);
let index = BigEndian::read_u64(&key.0[16..]);
(slot, set_index, index)
} }
} }
@ -179,12 +172,8 @@ impl Column<Kvs> for cf::SlotMeta {
} }
} }
impl TypedColumn<Kvs> for cf::SlotMeta { impl Column<Kvs> for cf::SlotMeta {
type Type = super::SlotMeta; const NAME: &'static str = super::META_CF;
}
impl Column<Kvs> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64; type Index = u64;
fn key(slot: u64) -> Key { fn key(slot: u64) -> Key {
@ -198,8 +187,8 @@ impl Column<Kvs> for cf::Index {
} }
} }
impl TypedColumn<Kvs> for cf::Index { impl TypedColumn<Kvs> for cf::SlotMeta {
type Type = crate::blocktree::meta::Index; type Type = super::SlotMeta;
} }
impl Column<Kvs> for cf::ErasureMeta { impl Column<Kvs> for cf::ErasureMeta {

View File

@ -1,6 +1,6 @@
use crate::erasure::CodingHeader; use crate::erasure::{NUM_CODING, NUM_DATA};
use solana_metrics::datapoint; use solana_metrics::datapoint;
use std::{collections::BTreeMap, ops::RangeBounds}; use std::borrow::Borrow;
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
// The Meta column family // The Meta column family
@ -27,43 +27,6 @@ pub struct SlotMeta {
pub is_connected: bool, pub is_connected: bool,
} }
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Index recording presence/absence of blobs
pub struct Index {
pub slot: u64,
data: DataIndex,
coding: CodingIndex,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct DataIndex {
slot: u64,
/// Map representing presence/absence of data blobs
index: BTreeMap<u64, bool>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Erasure coding information
pub struct CodingIndex {
slot: u64,
/// Map from set index, to hashmap from blob index to presence bool
index: BTreeMap<u64, BTreeMap<u64, bool>>,
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
header: CodingHeader,
set_index: u64,
}
#[derive(Debug, PartialEq)]
pub enum ErasureMetaStatus {
CanRecover,
DataFull,
StillNeed(usize),
}
impl SlotMeta { impl SlotMeta {
pub fn is_full(&self) -> bool { pub fn is_full(&self) -> bool {
// last_index is std::u64::MAX when it has no information about how // last_index is std::u64::MAX when it has no information about how
@ -109,174 +72,274 @@ impl SlotMeta {
} }
} }
impl Index { #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub(in crate::blocktree) fn new(slot: u64) -> Self { /// Erasure coding information
Index { pub struct ErasureMeta {
slot, /// Which erasure set in the slot this is
data: DataIndex::default(), pub set_index: u64,
coding: CodingIndex::default(), /// Size of shards in this erasure set
} pub size: usize,
} /// Bitfield representing presence/absence of data blobs
data: u64,
pub fn data(&self) -> &DataIndex { /// Bitfield representing presence/absence of coding blobs
&self.data coding: u64,
}
pub fn coding(&self) -> &CodingIndex {
&self.coding
}
pub fn data_mut(&mut self) -> &mut DataIndex {
&mut self.data
}
pub fn coding_mut(&mut self) -> &mut CodingIndex {
&mut self.coding
}
} }
impl CodingIndex { #[derive(Debug, PartialEq)]
pub fn is_set_present(&self, set_index: u64) -> bool { pub enum ErasureMetaStatus {
self.index.contains_key(&set_index) CanRecover,
} DataFull,
StillNeed(usize),
pub fn present_in_set(&self, set_index: u64) -> usize {
match self.index.get(&set_index) {
Some(map) => map.values().filter(|presence| **presence).count(),
None => 0,
}
}
pub fn is_present(&self, set_index: u64, blob_index: u64) -> bool {
match self.index.get(&set_index) {
Some(map) => *map.get(&blob_index).unwrap_or(&false),
None => false,
}
}
pub fn set_present(&mut self, set_index: u64, blob_index: u64, present: bool) {
let set_map = self
.index
.entry(set_index)
.or_insert_with(BTreeMap::default);
set_map.insert(blob_index, present);
}
}
impl DataIndex {
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index
.range(bounds)
.filter(|(_, presence)| **presence)
.count()
}
pub fn is_present(&self, index: u64) -> bool {
*self.index.get(&index).unwrap_or(&false)
}
pub fn set_present(&mut self, index: u64, presence: bool) {
self.index.insert(index, presence);
}
} }
impl ErasureMeta { impl ErasureMeta {
pub(in crate::blocktree) fn new(set_index: u64) -> ErasureMeta { pub fn new(set_index: u64) -> ErasureMeta {
ErasureMeta { ErasureMeta {
header: CodingHeader::default(),
set_index, set_index,
size: 0,
data: 0,
coding: 0,
} }
} }
pub fn session_info(&self) -> CodingHeader { pub fn status(&self) -> ErasureMetaStatus {
self.header let (data_missing, coding_missing) =
} (NUM_DATA - self.num_data(), NUM_CODING - self.num_coding());
if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
pub fn set_session_info(&mut self, header: CodingHeader) { assert!(self.size != 0);
self.header = header; ErasureMetaStatus::CanRecover
}
pub fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*;
let start_idx = self.header.start_index;
let end_idx = start_idx + self.header.data_count as u64;
let num_coding = index.coding().present_in_set(self.header.set_index);
let num_data = index.data().present_in_bounds(start_idx..end_idx);
assert!(self.header.shard_size != 0);
let (data_missing, coding_missing) = (
self.header.data_count - num_data,
self.header.parity_count - num_coding,
);
let total_missing = data_missing + coding_missing;
if data_missing > 0 && total_missing <= self.header.parity_count {
CanRecover
} else if data_missing == 0 { } else if data_missing == 0 {
DataFull ErasureMetaStatus::DataFull
} else { } else {
StillNeed(total_missing - self.header.parity_count) ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING)
} }
} }
pub fn num_coding(&self) -> usize {
self.coding.count_ones() as usize
}
pub fn num_data(&self) -> usize {
self.data.count_ones() as usize
}
pub fn is_coding_present(&self, index: u64) -> bool {
if let Some(position) = self.data_index_in_set(index) {
self.coding & (1 << position) != 0
} else {
false
}
}
pub fn set_size(&mut self, size: usize) {
self.size = size;
}
pub fn size(&self) -> usize {
self.size
}
pub fn set_coding_present(&mut self, index: u64, present: bool) {
if let Some(position) = self.data_index_in_set(index) {
if present {
self.coding |= 1 << position;
} else {
self.coding &= !(1 << position);
}
}
}
pub fn is_data_present(&self, index: u64) -> bool {
if let Some(position) = self.data_index_in_set(index) {
self.data & (1 << position) != 0
} else {
false
}
}
pub fn set_data_present(&mut self, index: u64, present: bool) {
if let Some(position) = self.data_index_in_set(index) {
if present {
self.data |= 1 << position;
} else {
self.data &= !(1 << position);
}
}
}
pub fn set_data_multi<I, Idx>(&mut self, indexes: I, present: bool)
where
I: IntoIterator<Item = Idx>,
Idx: Borrow<u64>,
{
for index in indexes.into_iter() {
self.set_data_present(*index.borrow(), present);
}
}
pub fn set_coding_multi<I, Idx>(&mut self, indexes: I, present: bool)
where
I: IntoIterator<Item = Idx>,
Idx: Borrow<u64>,
{
for index in indexes.into_iter() {
self.set_coding_present(*index.borrow(), present);
}
}
pub fn set_index_for(index: u64) -> u64 {
index / NUM_DATA as u64
}
pub fn data_index_in_set(&self, index: u64) -> Option<u64> {
let set_index = Self::set_index_for(index);
if set_index == self.set_index {
Some(index - self.start_index())
} else {
None
}
}
pub fn coding_index_in_set(&self, index: u64) -> Option<u64> {
self.data_index_in_set(index).map(|i| i + NUM_DATA as u64)
}
pub fn start_index(&self) -> u64 {
self.set_index * NUM_DATA as u64
}
/// returns a tuple of (data_end, coding_end)
pub fn end_indexes(&self) -> (u64, u64) {
let start = self.start_index();
(start + NUM_DATA as u64, start + NUM_CODING as u64)
}
} }
#[cfg(test)] #[test]
mod test { fn test_meta_indexes() {
use super::*; use rand::{thread_rng, Rng};
// to avoid casts everywhere
const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64;
const NUM_DATA: u64 = 7; let mut rng = thread_rng();
const NUM_CODING: u64 = 8;
fn sample_header() -> CodingHeader { for _ in 0..100 {
CodingHeader { let set_index = rng.gen_range(0, 1_000);
shard_size: 1, let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, NUM_DATA);
data_count: NUM_DATA as usize,
parity_count: NUM_CODING as usize, assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
..CodingHeader::default() let e_meta = ErasureMeta::new(set_index);
}
assert_eq!(e_meta.start_index(), set_index * NUM_DATA);
let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA);
assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64);
} }
#[test] let mut e_meta = ErasureMeta::new(0);
fn test_erasure_meta_status() {
let set_index = 0;
let header = sample_header(); assert_eq!(e_meta.data_index_in_set(0), Some(0));
let mut e_meta = ErasureMeta::new(set_index); assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2));
e_meta.set_session_info(header); assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1));
assert_eq!(e_meta.data_index_in_set(NUM_DATA), None);
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
let mut index = Index::new(0); e_meta.set_index = 1;
assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(7)); assert_eq!(e_meta.data_index_in_set(0), None);
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None);
assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0));
assert_eq!(
e_meta.data_index_in_set(NUM_DATA * 2 - 1),
Some(NUM_DATA - 1)
);
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
}
for i in 0..NUM_DATA { #[test]
index.data_mut().set_present(i, true); fn test_meta_coding_present() {
} let mut e_meta = ErasureMeta::default();
assert_eq!(e_meta.status(&index), ErasureMetaStatus::DataFull); e_meta.set_coding_multi(0..NUM_CODING as u64, true);
for i in 0..NUM_CODING as u64 {
assert_eq!(e_meta.is_coding_present(i), true);
}
for i in NUM_CODING as u64..NUM_DATA as u64 {
assert_eq!(e_meta.is_coding_present(i), false);
}
index.data_mut().set_present(NUM_DATA - 1, false); e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64);
let start_idx = e_meta.start_index();
e_meta.set_coding_multi(start_idx..start_idx + NUM_CODING as u64, true);
assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(1)); for i in start_idx..start_idx + NUM_CODING as u64 {
e_meta.set_coding_present(i, true);
for i in 0..NUM_DATA - 2 { assert_eq!(e_meta.is_coding_present(i), true);
index.data_mut().set_present(i, false); }
} for i in start_idx + NUM_CODING as u64..start_idx + NUM_DATA as u64 {
assert_eq!(e_meta.is_coding_present(i), false);
assert_eq!(e_meta.status(&index), ErasureMetaStatus::StillNeed(6)); }
}
for i in 0..NUM_CODING {
index.coding_mut().set_present(set_index, i, true); #[test]
} fn test_erasure_meta_status() {
use rand::{seq::SliceRandom, thread_rng};
index.data_mut().set_present(NUM_DATA - 1, false); // Local constansts just used to avoid repetitive casts
const N_DATA: u64 = crate::erasure::NUM_DATA as u64;
for i in 0..NUM_DATA - 1 { const N_CODING: u64 = crate::erasure::NUM_CODING as u64;
index.data_mut().set_present(i, true);
let mut e_meta = ErasureMeta::default();
assert_eq!(e_meta.status(&index), ErasureMetaStatus::CanRecover); let mut rng = thread_rng();
} let data_indexes: Vec<u64> = (0..N_DATA).collect();
let coding_indexes: Vec<u64> = (0..N_CODING).collect();
assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA));
e_meta.set_data_multi(0..N_DATA, true);
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
e_meta.size = 1;
e_meta.set_coding_multi(0..N_CODING, true);
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
for &idx in data_indexes.choose_multiple(&mut rng, NUM_CODING) {
e_meta.set_data_present(idx, false);
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
}
e_meta.set_data_multi(0..N_DATA, true);
for &idx in coding_indexes.choose_multiple(&mut rng, NUM_CODING) {
e_meta.set_coding_present(idx, false);
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
}
}
#[test]
fn test_meta_data_present() {
let mut e_meta = ErasureMeta::default();
e_meta.set_data_multi(0..NUM_DATA as u64, true);
for i in 0..NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), true);
}
for i in NUM_DATA as u64..2 * NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), false);
}
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64);
let start_idx = e_meta.start_index();
e_meta.set_data_multi(start_idx..start_idx + NUM_DATA as u64, true);
for i in start_idx..start_idx + NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), true);
}
for i in start_idx - NUM_DATA as u64..start_idx {
assert_eq!(e_meta.is_data_present(i), false);
} }
} }

View File

@ -30,9 +30,7 @@ impl Backend for Rocks {
type Error = rocksdb::Error; type Error = rocksdb::Error;
fn open(path: &Path) -> Result<Rocks> { fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{ use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta,
};
fs::create_dir_all(&path)?; fs::create_dir_all(&path)?;
@ -42,14 +40,12 @@ impl Backend for Rocks {
// Column family names // Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
let dead_slots_cf_descriptor = let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
let erasure_meta_cf_descriptor = let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_desc = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
let cfs = vec![ let cfs = vec![
meta_cf_descriptor, meta_cf_descriptor,
@ -59,7 +55,6 @@ impl Backend for Rocks {
erasure_meta_cf_descriptor, erasure_meta_cf_descriptor,
orphans_cf_descriptor, orphans_cf_descriptor,
root_cf_descriptor, root_cf_descriptor,
index_desc,
]; ];
// Open the database // Open the database
@ -69,15 +64,12 @@ impl Backend for Rocks {
} }
fn columns(&self) -> Vec<&'static str> { fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{ use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta,
};
vec![ vec![
Coding::NAME, Coding::NAME,
ErasureMeta::NAME, ErasureMeta::NAME,
DeadSlots::NAME, DeadSlots::NAME,
Index::NAME,
Data::NAME, Data::NAME,
Orphans::NAME, Orphans::NAME,
Root::NAME, Root::NAME,
@ -143,21 +135,14 @@ impl Backend for Rocks {
impl Column<Rocks> for cf::Coding { impl Column<Rocks> for cf::Coding {
const NAME: &'static str = super::ERASURE_CF; const NAME: &'static str = super::ERASURE_CF;
type Index = (u64, u64, u64); type Index = (u64, u64);
fn key((slot, set_index, index): (u64, u64, u64)) -> Vec<u8> { fn key(index: (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 24]; cf::Data::key(index)
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..16], set_index);
BigEndian::write_u64(&mut key[16..], index);
key
} }
fn index(key: &[u8]) -> (u64, u64, u64) { fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]); cf::Data::index(key)
let set_index = BigEndian::read_u64(&key[8..16]);
let index = BigEndian::read_u64(&key[16..]);
(slot, set_index, index)
} }
} }
@ -278,25 +263,6 @@ impl TypedColumn<Rocks> for cf::ErasureMeta {
type Type = super::ErasureMeta; type Type = super::ErasureMeta;
} }
impl Column<Rocks> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
}
impl TypedColumn<Rocks> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl DbCursor<Rocks> for DBRawIterator { impl DbCursor<Rocks> for DBRawIterator {
fn valid(&self) -> bool { fn valid(&self) -> bool {
DBRawIterator::valid(self) DBRawIterator::valid(self)

View File

@ -3,6 +3,7 @@ use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastR
use self::standard_broadcast_run::StandardBroadcastRun; use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::erasure::CodingGenerator;
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
@ -79,6 +80,7 @@ trait BroadcastRun {
} }
struct Broadcast { struct Broadcast {
coding_generator: CodingGenerator,
thread_pool: ThreadPool, thread_pool: ThreadPool,
} }
@ -113,7 +115,10 @@ impl BroadcastStage {
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
mut broadcast_stage_run: impl BroadcastRun, mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType { ) -> BroadcastStageReturnType {
let coding_generator = CodingGenerator::default();
let mut broadcast = Broadcast { let mut broadcast = Broadcast {
coding_generator,
thread_pool: rayon::ThreadPoolBuilder::new() thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize) .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build() .build()

View File

@ -1,6 +1,6 @@
use crate::entry::Entry; use crate::entry::Entry;
use crate::entry::EntrySlice; use crate::entry::EntrySlice;
use crate::erasure; use crate::erasure::CodingGenerator;
use crate::packet::{self, SharedBlob}; use crate::packet::{self, SharedBlob};
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntries;
use crate::result::Result; use crate::result::Result;
@ -81,7 +81,7 @@ pub(super) fn entries_to_blobs(
last_tick: u64, last_tick: u64,
bank: &Bank, bank: &Bank,
keypair: &Keypair, keypair: &Keypair,
set_index: &mut u64, coding_generator: &mut CodingGenerator,
) -> (Vec<SharedBlob>, Vec<SharedBlob>) { ) -> (Vec<SharedBlob>, Vec<SharedBlob>) {
let blobs = generate_data_blobs( let blobs = generate_data_blobs(
ventries, ventries,
@ -92,18 +92,7 @@ pub(super) fn entries_to_blobs(
&keypair, &keypair,
); );
let start_index = blobs[0].read().unwrap().index(); let coding = generate_coding_blobs(&blobs, &thread_pool, coding_generator, &keypair);
let coding = generate_coding_blobs(
&blobs,
&thread_pool,
bank.slot(),
*set_index,
start_index,
&keypair,
);
*set_index += 1;
(blobs, coding) (blobs, coding)
} }
@ -152,15 +141,10 @@ pub(super) fn generate_data_blobs(
pub(super) fn generate_coding_blobs( pub(super) fn generate_coding_blobs(
blobs: &[SharedBlob], blobs: &[SharedBlob],
thread_pool: &ThreadPool, thread_pool: &ThreadPool,
slot: u64, coding_generator: &mut CodingGenerator,
set_index: u64,
start_index: u64,
keypair: &Keypair, keypair: &Keypair,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
let set_len = blobs.len(); let coding = coding_generator.next(&blobs);
let coding = erasure::encode_shared(slot, set_index, start_index, blobs, set_len)
.expect("Erasure coding failed");
thread_pool.install(|| { thread_pool.install(|| {
coding.par_iter().for_each(|c| { coding.par_iter().for_each(|c| {

View File

@ -1,13 +1,11 @@
use super::*; use super::*;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
pub(super) struct FailEntryVerificationBroadcastRun { pub(super) struct FailEntryVerificationBroadcastRun {}
set_index: u64,
}
impl FailEntryVerificationBroadcastRun { impl FailEntryVerificationBroadcastRun {
pub(super) fn new() -> Self { pub(super) fn new() -> Self {
Self { set_index: 0 } Self {}
} }
} }
@ -51,7 +49,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
last_tick, last_tick,
&bank, &bank,
&keypair, &keypair,
&mut self.set_index, &mut broadcast.coding_generator,
); );
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;

View File

@ -10,14 +10,12 @@ struct BroadcastStats {
pub(super) struct StandardBroadcastRun { pub(super) struct StandardBroadcastRun {
stats: BroadcastStats, stats: BroadcastStats,
set_index: u64,
} }
impl StandardBroadcastRun { impl StandardBroadcastRun {
pub(super) fn new() -> Self { pub(super) fn new() -> Self {
Self { Self {
stats: BroadcastStats::default(), stats: BroadcastStats::default(),
set_index: 0,
} }
} }
@ -81,7 +79,7 @@ impl BroadcastRun for StandardBroadcastRun {
last_tick, last_tick,
&bank, &bank,
&keypair, &keypair,
&mut self.set_index, &mut broadcast.coding_generator,
); );
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?; blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;

View File

@ -133,7 +133,7 @@ mod tests {
hasher.hash(&buf[..size]); hasher.hash(&buf[..size]);
// golden needs to be updated if blob stuff changes.... // golden needs to be updated if blob stuff changes....
let golden: Hash = "AMKCEbK6txetPPEQ8JDVdrqpefgtGRzN6ng2gSmKy6Fv" let golden: Hash = "E2HZjSC6VgH4nmEiTbMDATTeBcFjwSYz7QYvU7doGNhD"
.parse() .parse()
.unwrap(); .unwrap();

View File

@ -4,7 +4,8 @@ use crate::crds_value::EpochSlots;
use crate::result::Result; use crate::result::Result;
use crate::service::Service; use crate::service::Service;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use rand::{seq::SliceRandom, Rng, SeedableRng}; use rand::seq::SliceRandom;
use rand::SeedableRng;
use rand_chacha::ChaChaRng; use rand_chacha::ChaChaRng;
use solana_metrics::datapoint; use solana_metrics::datapoint;
use solana_runtime::epoch_schedule::EpochSchedule; use solana_runtime::epoch_schedule::EpochSchedule;
@ -271,8 +272,6 @@ impl ClusterInfoRepairListener {
let mut total_data_blobs_sent = 0; let mut total_data_blobs_sent = 0;
let mut total_coding_blobs_sent = 0; let mut total_coding_blobs_sent = 0;
let mut num_slots_repaired = 0; let mut num_slots_repaired = 0;
let mut rng = rand::thread_rng();
let max_confirmed_repairee_epoch = let max_confirmed_repairee_epoch =
epoch_schedule.get_stakers_epoch(repairee_epoch_slots.root); epoch_schedule.get_stakers_epoch(repairee_epoch_slots.root);
let max_confirmed_repairee_slot = let max_confirmed_repairee_slot =
@ -306,38 +305,20 @@ impl ClusterInfoRepairListener {
// a database iterator over the slots because by the time this node is // a database iterator over the slots because by the time this node is
// sending the blobs in this slot for repair, we expect these slots // sending the blobs in this slot for repair, we expect these slots
// to be full. // to be full.
if let Some(blob_data) = blocktree
if let Some(data_blob) = blocktree .get_data_blob_bytes(slot, blob_index as u64)
.get_data_blob(slot, blob_index as u64)
.expect("Failed to read data blob from blocktree") .expect("Failed to read data blob from blocktree")
{ {
socket.send_to(&data_blob.data[..], repairee_tvu)?; socket.send_to(&blob_data[..], repairee_tvu)?;
total_data_blobs_sent += 1; total_data_blobs_sent += 1;
}
if let Some(coding_header) = data_blob.get_coding_header() { if let Some(coding_bytes) = blocktree
let ratio = std::cmp::max( .get_coding_blob_bytes(slot, blob_index as u64)
1, .expect("Failed to read coding blob from blocktree")
coding_header.parity_count / coding_header.data_count, {
); socket.send_to(&coding_bytes[..], repairee_tvu)?;
total_coding_blobs_sent += 1;
for _ in 0..ratio {
let chosen_index =
rng.gen_range(0, coding_header.parity_count as u64);
let coding_blob_opt = blocktree
.get_coding_blob(
slot,
coding_header.set_index,
chosen_index,
)
.expect("Failed to read coding blob from blocktree");
if let Some(coding_blob) = coding_blob_opt {
socket.send_to(&coding_blob.data[..], repairee_tvu)?;
total_coding_blobs_sent += 1;
}
}
}
} }
} }

View File

@ -41,261 +41,237 @@
//! //!
//! //!
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use std::{ use std::cmp;
borrow::BorrowMut, use std::convert::AsMut;
sync::{Arc, RwLock}, use std::sync::{Arc, RwLock};
};
use reed_solomon_erasure::ReedSolomon; use reed_solomon_erasure::ReedSolomon;
/// Max number of data blobs in an erasure set; Also max number of parity blobs. //TODO(sakridge) pick these values
pub const MAX_SET_SIZE: usize = 255; /// Number of data blobs
pub const NUM_DATA: usize = 8;
/// Number of coding blobs; also the maximum number that can go missing.
pub const NUM_CODING: usize = 8;
/// Total number of blobs in an erasure set; includes data and coding blobs
pub const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
type Result<T> = std::result::Result<T, reed_solomon_erasure::Error>; type Result<T> = std::result::Result<T, reed_solomon_erasure::Error>;
/// This struct is stored in the header of any data blob that has been encoded /// Represents an erasure "session" with a particular configuration and number of data and coding
/// Every coding blob contains it. /// blobs
#[derive(Clone, Copy, Default, Debug, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone)]
pub struct CodingHeader { pub struct Session(ReedSolomon);
/// Index of the first data blob in the set
pub start_index: u64, /// Generates coding blobs on demand given data blobs
/// Index of the erasure set. Slot relative. #[derive(Debug, Clone)]
pub set_index: u64, pub struct CodingGenerator {
/// Number of data blobs in the set. /// SharedBlobs that couldn't be used in last call to next()
pub data_count: usize, leftover: Vec<SharedBlob>,
/// Number of parity blobs in the set. session: Arc<Session>,
pub parity_count: usize,
/// Size of the largest data blob in the set, including the header.
pub shard_size: usize,
} }
impl CodingHeader { impl Session {
/// returns the set-relative index of the blob with the given index. pub fn new(data_count: usize, coding_count: usize) -> Result<Session> {
pub fn data_index_in_set(&self, index: u64) -> u64 { let rs = ReedSolomon::new(data_count, coding_count)?;
index - self.start_index
Ok(Session(rs))
} }
/// returns the set-relative index of the coding blob with the given index. /// Create coding blocks by overwriting `parity`
/// in the context of erasure/recovery coding blobs come after data-blobs. pub fn encode(&self, data: &[&[u8]], parity: &mut [&mut [u8]]) -> Result<()> {
pub fn coding_index_in_set(&self, index: u64) -> u64 { self.0.encode_sep(data, parity)?;
index + self.data_count as u64
Ok(())
} }
/// returns the end boundary indexes of the data and coding blobs in this set, respectively. /// Recover data + coding blocks into data blocks
pub fn end_indexes(&self) -> (u64, u64) { /// # Arguments
let start = self.start_index; /// * `data` - array of data blocks to recover into
(start + self.data_count as u64, self.parity_count as u64) /// * `coding` - array of coding blocks
/// * `erasures` - list of indices in data where blocks should be recovered
pub fn decode_blocks(&self, blocks: &mut [&mut [u8]], present: &[bool]) -> Result<()> {
self.0.reconstruct(blocks, present)?;
Ok(())
} }
}
/// Erasure code data blobs. /// Returns `(number_of_data_blobs, number_of_coding_blobs)`
/// pub fn dimensions(&self) -> (usize, usize) {
/// # Arguments (self.0.data_shard_count(), self.0.parity_shard_count())
/// }
/// * `slot` - slot all blobs belong to
/// * `set_index` - index of the erasure set being encoded
/// * `start_index` - index of the first data blob
/// * `blobs` - data blobs to be encoded. an amount greater than `MAX_SET_SIZE` causes errors.
/// * `parity` - number of parity blobs to create. values greater than `MAX_SET_SIZE` cause errors.
pub fn encode<B: BorrowMut<Blob>>(
slot: u64,
set_index: u64,
start_index: u64,
blobs: &mut [B],
parity: usize,
) -> Result<Vec<Blob>> {
let data = blobs.len();
// this would fail if there are too few or too many blobs
let rs = ReedSolomon::new(data, parity)?;
let mut header = CodingHeader {
data_count: data,
parity_count: parity,
start_index,
set_index,
shard_size: 0,
};
let shard_size = blobs /// Reconstruct any missing blobs in this erasure set if possible
.iter_mut() /// Re-indexes any coding blobs that have been reconstructed and fixes up size in metadata
.map(|blob| (*blob).borrow().data_size() as usize) /// Assumes that the user has sliced into the blobs appropriately already. else recovery will
.max() /// return an error or garbage data
.expect("must be >=1 blobs"); pub fn reconstruct_blobs<B>(
&self,
blobs: &mut [B],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)>
where
B: AsMut<[u8]>,
{
let mut blocks: Vec<&mut [u8]> = blobs.iter_mut().map(AsMut::as_mut).collect();
//header.shard_size = crate::packet::BLOB_DATA_SIZE; trace!("[reconstruct_blobs] present: {:?}, size: {}", present, size,);
header.shard_size = shard_size;
let slices = blobs // Decode the blocks
.iter_mut() self.decode_blocks(blocks.as_mut_slice(), &present)?;
.map(|b| {
let blob: &mut Blob = b.borrow_mut();
blob.set_coding_header(&header);
&blob.data[..shard_size]
})
.collect::<Vec<_>>();
let mut parity_blocks = (0..parity).map(|_| vec![0; shard_size]).collect::<Vec<_>>(); let mut recovered_data = vec![];
let mut parity_slices = parity_blocks let mut recovered_coding = vec![];
.iter_mut()
.map(|v| &mut v[..])
.collect::<Vec<_>>();
rs.encode_sep(&slices[..], &mut parity_slices[..])?; let erasures = present
.iter()
.enumerate()
.filter_map(|(i, present)| if *present { None } else { Some(i) });
let parity = parity_blocks // Create the missing blobs from the reconstructed data
.into_iter() for n in erasures {
.enumerate() let data_size;
.map(|(idx, block)| { let idx;
let mut blob = Blob::default(); let first_byte;
(&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + shard_size])
.copy_from_slice(&block);
blob.set_slot(slot);
blob.set_size(shard_size - BLOB_HEADER_SIZE);
//blob.set_data_size(shard_size as u64);
blob.set_coding();
blob.set_coding_header(&header);
blob.set_index(idx as u64);
blob if n < NUM_DATA {
}) let mut blob = Blob::new(&blocks[n]);
.collect();
Ok(parity) data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
} idx = n as u64 + block_start_idx;
first_byte = blob.data[0];
/// See `encode`. blob.set_size(data_size);
/// Convenience function to encode and return `Arc<RwLock<Blob>>`s recovered_data.push(blob);
pub fn encode_shared( } else {
slot: u64, let mut blob = Blob::default();
set_index: u64, blob.data_mut()[..size].copy_from_slice(&blocks[n]);
start_index: u64, data_size = size;
blobs: &[SharedBlob], idx = (n as u64 + block_start_idx) - NUM_DATA as u64;
parity: usize, first_byte = blob.data[0];
) -> Result<Vec<SharedBlob>> {
let mut locks = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect::<Vec<_>>();
let mut blobs = locks.iter_mut().map(|lock| &mut **lock).collect::<Vec<_>>(); blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
recovered_coding.push(blob);
}
let parity_blobs = encode(slot, set_index, start_index, &mut blobs[..], parity)? trace!(
.into_iter() "[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
.map(|blob| Arc::new(RwLock::new(blob))) n,
.collect(); idx,
data_size,
Ok(parity_blobs) first_byte
}
/// Attempt to recover missing blobs
/// # Arguments
/// * `info` - the encoding parameters for this erasure set
/// * `slot` - the slot that these blobs belong to
/// * `blobs` - data blobs, followed by parity blobs. blobs must be in order or the recovery will
/// succeed but return garbage.
/// * `present` - each element indicates the presence of the blob with the same set-relative index
pub fn decode<B>(
info: &CodingHeader,
slot: u64,
blobs: &mut [B],
present: &[bool],
) -> Result<(Vec<Blob>, Vec<Blob>)>
where
B: BorrowMut<Blob>,
{
let rs = ReedSolomon::new(info.data_count as usize, info.parity_count as usize)?;
let mut blocks = vec![];
for (idx, blob) in blobs.iter_mut().enumerate() {
if idx < info.data_count {
blocks.push(&mut blob.borrow_mut().data[..info.shard_size as usize]);
} else {
blocks.push(
&mut blob.borrow_mut().data
[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + info.shard_size as usize],
); );
} }
Ok((recovered_data, recovered_coding))
} }
assert_eq!(
blocks.len(),
rs.data_shard_count() + rs.parity_shard_count()
);
rs.reconstruct(&mut blocks[..], present)?;
let mut recovered_data = vec![];
let mut recovered_coding = vec![];
let erasures = present
.iter()
.enumerate()
.filter_map(|(i, present)| if *present { None } else { Some(i) });
let shard_size = info.shard_size as usize;
// Create the missing blobs from the reconstructed data
for n in erasures {
let data_size;
let idx;
let first_byte;
if n < info.data_count {
let mut blob: Box<Blob> = Box::default();
(&mut blob.data[..shard_size]).copy_from_slice(&blocks[n]);
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
idx = n as u64 + info.start_index;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
blob.set_coding_header(info);
recovered_data.push(*blob);
} else {
let mut blob = Blob::default();
(&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + shard_size])
.copy_from_slice(&blocks[n]);
data_size = shard_size;
idx = (n - info.data_count) as u64;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
blob.set_coding_header(info);
recovered_coding.push(blob);
}
trace!(
"[reconstruct_blobs] erasures[{}] ({}) data_size: {} data[0]: {}",
n,
idx,
data_size,
first_byte
);
}
Ok((recovered_data, recovered_coding))
} }
/// See `decode` impl CodingGenerator {
/// Convenience function to accept and return `Arc<RwLock<Blob>>`s pub fn new(session: Arc<Session>) -> Self {
pub fn decode_shared( CodingGenerator {
info: &CodingHeader, leftover: Vec::with_capacity(session.0.data_shard_count()),
slot: u64, session,
blobs: &[SharedBlob], }
present: &[bool], }
) -> Result<(Vec<Blob>, Vec<Blob>)> {
let mut locks = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect::<Vec<_>>();
let mut blobs = locks.iter_mut().map(|lock| &mut **lock).collect::<Vec<_>>(); /// Yields next set of coding blobs, if any.
/// Must be called with consecutive data blobs within a slot.
///
/// Passing in a slice with the first blob having a new slot will cause internal state to
/// reset, so the above concern does not apply to slot boundaries, only indexes within a slot
/// must be consecutive.
///
/// If used improperly, it my return garbage coding blobs, but will not give an
/// error.
pub fn next(&mut self, next_data: &[SharedBlob]) -> Vec<SharedBlob> {
let (num_data, num_coding) = self.session.dimensions();
let mut next_coding =
Vec::with_capacity((self.leftover.len() + next_data.len()) / num_data * num_coding);
decode(info, slot, &mut blobs[..], present) if !self.leftover.is_empty()
&& !next_data.is_empty()
&& self.leftover[0].read().unwrap().slot() != next_data[0].read().unwrap().slot()
{
self.leftover.clear();
}
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
for data_blobs in next_data.chunks(num_data) {
if data_blobs.len() < num_data {
self.leftover = data_blobs.to_vec();
break;
}
self.leftover.clear();
// find max_data_size for the erasure set
let max_data_size = data_blobs
.iter()
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max));
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.map(|l| &l.data[..max_data_size])
.collect();
let mut coding_blobs = Vec::with_capacity(num_coding);
for data_blob in &data_locks[..num_coding] {
let index = data_blob.index();
let slot = data_blob.slot();
let id = data_blob.id();
let mut coding_blob = Blob::default();
coding_blob.set_index(index);
coding_blob.set_slot(slot);
coding_blob.set_id(&id);
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
coding_blobs.push(coding_blob);
}
if {
let mut coding_ptrs: Vec<_> = coding_blobs
.iter_mut()
.map(|blob| &mut blob.data_mut()[..max_data_size])
.collect();
self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice())
}
.is_ok()
{
next_coding.append(&mut coding_blobs);
}
}
next_coding
.into_iter()
.map(|blob| Arc::new(RwLock::new(blob)))
.collect()
}
}
impl Default for Session {
fn default() -> Session {
Session::new(NUM_DATA, NUM_CODING).unwrap()
}
}
impl Default for CodingGenerator {
fn default() -> Self {
let session = Session::default();
CodingGenerator {
leftover: Vec::with_capacity(session.0.data_shard_count()),
session: Arc::new(session),
}
}
} }
#[cfg(test)] #[cfg(test)]
@ -309,10 +285,6 @@ pub mod test {
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow; use std::borrow::Borrow;
const NUM_DATA: usize = 8;
const NUM_CODING: usize = 9;
const ERASURE_SET_SIZE: usize = NUM_DATA + NUM_CODING;
/// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set /// Specifies the contents of a 16-data-blob and 4-coding-blob erasure set
/// Exists to be passed to `generate_blocktree_with_coding` /// Exists to be passed to `generate_blocktree_with_coding`
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
@ -321,8 +293,6 @@ pub mod test {
pub set_index: u64, pub set_index: u64,
pub num_data: usize, pub num_data: usize,
pub num_coding: usize, pub num_coding: usize,
pub data_count: usize,
pub parity_count: usize,
} }
/// Specifies the contents of a slot /// Specifies the contents of a slot
@ -348,54 +318,107 @@ pub mod test {
pub start_index: u64, pub start_index: u64,
pub coding: Vec<SharedBlob>, pub coding: Vec<SharedBlob>,
pub data: Vec<SharedBlob>, pub data: Vec<SharedBlob>,
pub data_count: usize,
pub parity_count: usize,
} }
fn test_toss_and_recover(slot: u64, data_blobs: &[SharedBlob], coding_blobs: &[SharedBlob]) { #[test]
fn test_coding() {
const N_DATA: usize = 4;
const N_CODING: usize = 2;
let session = Session::new(N_DATA, N_CODING).unwrap();
let mut vs: Vec<Vec<u8>> = (0..N_DATA as u8).map(|i| (i..(16 + i)).collect()).collect();
let v_orig: Vec<u8> = vs[0].clone();
let mut coding_blocks: Vec<_> = (0..N_CODING).map(|_| vec![0u8; 16]).collect();
let mut coding_blocks_slices: Vec<_> =
coding_blocks.iter_mut().map(Vec::as_mut_slice).collect();
let v_slices: Vec<_> = vs.iter().map(Vec::as_slice).collect();
session
.encode(v_slices.as_slice(), coding_blocks_slices.as_mut_slice())
.expect("encoding must succeed");
trace!("test_coding: coding blocks:");
for b in &coding_blocks {
trace!("test_coding: {:?}", b);
}
let erasure: usize = 1;
let present = &mut [true; N_DATA + N_CODING];
present[erasure] = false;
let erased = vs[erasure].clone();
// clear an entry
vs[erasure as usize].copy_from_slice(&[0; 16]);
let mut blocks: Vec<_> = vs
.iter_mut()
.chain(coding_blocks.iter_mut())
.map(Vec::as_mut_slice)
.collect();
session
.decode_blocks(blocks.as_mut_slice(), present)
.expect("decoding must succeed");
trace!("test_coding: vs:");
for v in &vs {
trace!("test_coding: {:?}", v);
}
assert_eq!(v_orig, vs[0]);
assert_eq!(erased, vs[erasure]);
}
fn test_toss_and_recover(
session: &Session,
data_blobs: &[SharedBlob],
coding_blobs: &[SharedBlob],
block_start_idx: usize,
) {
let size = coding_blobs[0].read().unwrap().size();
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE); let mut blobs: Vec<SharedBlob> = Vec::with_capacity(ERASURE_SET_SIZE);
blobs.push(SharedBlob::default()); // empty data, erasure at zero blobs.push(SharedBlob::default()); // empty data, erasure at zero
for blob in &data_blobs[1..] { for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
// skip first blob // skip first blob
blobs.push(blob.clone()); blobs.push(blob.clone());
} }
blobs.push(SharedBlob::default()); // empty coding, erasure at zero blobs.push(SharedBlob::default()); // empty coding, erasure at zero
for blob in &coding_blobs[1..] { for blob in &coding_blobs[1..NUM_CODING] {
blobs.push(blob.clone()); blobs.push(blob.clone());
} }
let info = coding_blobs[0]
.read()
.unwrap()
.get_coding_header()
.expect("coding info");
// toss one data and one coding // toss one data and one coding
let mut present = vec![true; blobs.len()]; let mut present = vec![true; blobs.len()];
present[0] = false; present[0] = false;
present[data_blobs.len()] = false; present[NUM_DATA] = false;
let (recovered_data, recovered_coding) = let (recovered_data, recovered_coding) = session
decode_shared(&info, slot, &mut blobs[..], &present) .reconstruct_shared_blobs(&mut blobs, &present, size, block_start_idx as u64, 0)
.expect("reconstruction must succeed"); .expect("reconstruction must succeed");
assert_eq!(recovered_data.len(), 1); assert_eq!(recovered_data.len(), 1);
assert_eq!(recovered_coding.len(), 1); assert_eq!(recovered_coding.len(), 1);
assert_eq!( assert_eq!(
blobs[1].read().unwrap().meta, blobs[1].read().unwrap().meta,
data_blobs[1].read().unwrap().meta data_blobs[block_start_idx + 1].read().unwrap().meta
); );
assert_eq!( assert_eq!(
blobs[1].read().unwrap().data(), blobs[1].read().unwrap().data(),
data_blobs[1].read().unwrap().data() data_blobs[block_start_idx + 1].read().unwrap().data()
);
assert_eq!(
recovered_data[0].meta,
data_blobs[block_start_idx].read().unwrap().meta
); );
assert_eq!(recovered_data[0].meta, data_blobs[0].read().unwrap().meta);
assert_eq!( assert_eq!(
recovered_data[0].data(), recovered_data[0].data(),
data_blobs[0].read().unwrap().data() data_blobs[block_start_idx].read().unwrap().data()
); );
assert_eq!( assert_eq!(
recovered_coding[0].data(), recovered_coding[0].data(),
@ -405,27 +428,72 @@ pub mod test {
#[test] #[test]
fn test_erasure_generate_coding() { fn test_erasure_generate_coding() {
solana_logger::setup();
// trivial case
let mut coding_generator = CodingGenerator::default();
let blobs = Vec::new();
for _ in 0..NUM_DATA * 2 {
let coding = coding_generator.next(&blobs);
assert!(coding.is_empty());
}
// test coding by iterating one blob at a time // test coding by iterating one blob at a time
let test_blobs = generate_shared_test_blobs(0, NUM_DATA * 2); let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for (idx, data_blobs) in test_blobs.chunks_exact(NUM_DATA).enumerate() { for (i, blob) in data_blobs.iter().cloned().enumerate() {
let coding_blobs = encode_shared( let coding_blobs = coding_generator.next(&[blob]);
0,
idx as u64,
(idx * NUM_DATA) as u64,
&data_blobs[..],
NUM_CODING,
)
.unwrap();
test_toss_and_recover(0, &data_blobs, &coding_blobs); if !coding_blobs.is_empty() {
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding_blobs.len(), NUM_CODING);
for j in 0..NUM_CODING {
assert_eq!(
coding_blobs[j].read().unwrap().index(),
((i / NUM_DATA) * NUM_DATA + j) as u64
);
}
test_toss_and_recover(
&coding_generator.session,
&data_blobs,
&coding_blobs,
i - (i % NUM_DATA),
);
}
} }
} }
#[test] #[test]
fn test_erasure_generate_blocktree_with_coding() { fn test_erasure_generate_coding_reset_on_new_slot() {
solana_logger::setup(); solana_logger::setup();
let mut coding_generator = CodingGenerator::default();
// test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for i in NUM_DATA..NUM_DATA * 2 {
data_blobs[i].write().unwrap().set_slot(1);
}
let coding_blobs = coding_generator.next(&data_blobs[0..NUM_DATA - 1]);
assert!(coding_blobs.is_empty());
let coding_blobs = coding_generator.next(&data_blobs[NUM_DATA..]);
assert_eq!(coding_blobs.len(), NUM_CODING);
test_toss_and_recover(
&coding_generator.session,
&data_blobs,
&coding_blobs,
NUM_DATA,
);
}
#[test]
fn test_erasure_generate_blocktree_with_coding() {
let cases = vec![ let cases = vec![
(NUM_DATA, NUM_CODING, 7, 5), (NUM_DATA, NUM_CODING, 7, 5),
(NUM_DATA - 6, NUM_CODING - 1, 5, 7), (NUM_DATA - 6, NUM_CODING - 1, 5, 7),
@ -441,8 +509,6 @@ pub mod test {
set_index, set_index,
num_data, num_data,
num_coding, num_coding,
data_count: NUM_DATA,
parity_count: NUM_CODING,
}) })
.collect(); .collect();
@ -457,17 +523,18 @@ pub mod test {
for erasure_spec in spec.set_specs.iter() { for erasure_spec in spec.set_specs.iter() {
let start_index = erasure_spec.set_index * NUM_DATA as u64; let start_index = erasure_spec.set_index * NUM_DATA as u64;
let data_end = start_index + erasure_spec.num_data as u64; let (data_end, coding_end) = (
start_index + erasure_spec.num_data as u64,
start_index + erasure_spec.num_coding as u64,
);
for idx in start_index..data_end { for idx in start_index..data_end {
let opt_bytes = blocktree.get_data_blob_bytes(slot, idx).unwrap(); let opt_bytes = blocktree.get_data_blob_bytes(slot, idx).unwrap();
assert!(opt_bytes.is_some()); assert!(opt_bytes.is_some());
} }
for idx in 0..erasure_spec.num_coding { for idx in start_index..coding_end {
let opt_bytes = blocktree let opt_bytes = blocktree.get_coding_blob_bytes(slot, idx).unwrap();
.get_coding_blob_bytes(slot, erasure_spec.set_index, idx as u64)
.unwrap();
assert!(opt_bytes.is_some()); assert!(opt_bytes.is_some());
} }
} }
@ -480,7 +547,10 @@ pub mod test {
#[test] #[test]
fn test_recovery_with_model() { fn test_recovery_with_model() {
use std::thread;
const MAX_ERASURE_SETS: u64 = 16; const MAX_ERASURE_SETS: u64 = 16;
const N_THREADS: usize = 2;
const N_SLOTS: u64 = 10; const N_SLOTS: u64 = 10;
solana_logger::setup(); solana_logger::setup();
@ -493,66 +563,85 @@ pub mod test {
set_index, set_index,
num_data: NUM_DATA, num_data: NUM_DATA,
num_coding: NUM_CODING, num_coding: NUM_CODING,
parity_count: NUM_CODING,
data_count: NUM_DATA,
}) })
.collect(); .collect();
SlotSpec { slot, set_specs } SlotSpec { slot, set_specs }
}); });
for slot_model in generate_ledger_model(specs) { let mut handles = vec![];
for erasure_set in slot_model.chunks { let session = Arc::new(Session::default());
let erased_coding = erasure_set.coding[0].clone();
let erased_data = erasure_set.data[..3].to_vec();
let info = erasure_set.coding[0]
.read()
.unwrap()
.get_coding_header()
.expect("coding info");
let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE); for i in 0..N_THREADS {
let specs = specs.clone();
let session = Arc::clone(&session);
blobs.push(SharedBlob::default()); let handle = thread::Builder::new()
blobs.push(SharedBlob::default()); .name(i.to_string())
blobs.push(SharedBlob::default()); .spawn(move || {
for blob in erasure_set.data.into_iter().skip(3) { for slot_model in generate_ledger_model(specs) {
blobs.push(blob); for erasure_set in slot_model.chunks {
} let erased_coding = erasure_set.coding[0].clone();
let erased_data = erasure_set.data[..3].to_vec();
blobs.push(SharedBlob::default()); let mut blobs = Vec::with_capacity(ERASURE_SET_SIZE);
for blob in erasure_set.coding.into_iter().skip(1) {
blobs.push(blob);
}
let mut present = vec![true; ERASURE_SET_SIZE]; blobs.push(SharedBlob::default());
present[0] = false; blobs.push(SharedBlob::default());
present[1] = false; blobs.push(SharedBlob::default());
present[2] = false; for blob in erasure_set.data.into_iter().skip(3) {
present[NUM_DATA] = false; blobs.push(blob);
}
decode_shared(&info, slot_model.slot, &mut blobs, &present) blobs.push(SharedBlob::default());
.expect("reconstruction must succeed"); for blob in erasure_set.coding.into_iter().skip(1) {
blobs.push(blob);
}
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) { let size = erased_coding.read().unwrap().size() as usize;
let expected = expected.read().unwrap();
let mut recovered = recovered.write().unwrap();
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
recovered.set_size(data_size);
let corrupt = data_size > BLOB_DATA_SIZE;
assert!(!corrupt, "CORRUPTION {}", data_size);
assert_eq!(expected.data(), recovered.data());
}
assert_eq!( let mut present = vec![true; ERASURE_SET_SIZE];
erased_coding.read().unwrap().data(), present[0] = false;
blobs[NUM_DATA].read().unwrap().data() present[1] = false;
); present[2] = false;
present[NUM_DATA] = false;
debug!("passed set: {}", erasure_set.set_index); session
} .reconstruct_shared_blobs(
debug!("passed slot: {}", slot_model.slot); &mut blobs,
&present,
size,
erasure_set.set_index * NUM_DATA as u64,
slot_model.slot,
)
.expect("reconstruction must succeed");
for (expected, recovered) in erased_data.iter().zip(blobs.iter()) {
let expected = expected.read().unwrap();
let mut recovered = recovered.write().unwrap();
let data_size = recovered.data_size() as usize - BLOB_HEADER_SIZE;
recovered.set_size(data_size);
let corrupt = data_size > BLOB_DATA_SIZE;
assert!(!corrupt, "CORRUPTION");
assert_eq!(&*expected, &*recovered);
}
assert_eq!(
erased_coding.read().unwrap().data(),
blobs[NUM_DATA].read().unwrap().data()
);
debug!("passed set: {}", erasure_set.set_index);
}
debug!("passed slot: {}", slot_model.slot);
}
})
.expect("thread build error");
handles.push(handle);
} }
handles.into_iter().for_each(|h| h.join().unwrap());
} }
/// Generates a model of a ledger containing certain data and coding blobs according to a spec /// Generates a model of a ledger containing certain data and coding blobs according to a spec
@ -564,6 +653,8 @@ pub mod test {
IntoIt: Iterator<Item = S> + Clone + 'a, IntoIt: Iterator<Item = S> + Clone + 'a,
S: Borrow<SlotSpec>, S: Borrow<SlotSpec>,
{ {
let mut coding_generator = CodingGenerator::default();
specs.into_iter().map(move |spec| { specs.into_iter().map(move |spec| {
let spec = spec.borrow(); let spec = spec.borrow();
let slot = spec.slot; let slot = spec.slot;
@ -574,10 +665,8 @@ pub mod test {
.map(|erasure_spec| { .map(|erasure_spec| {
let set_index = erasure_spec.set_index as usize; let set_index = erasure_spec.set_index as usize;
let start_index = set_index * NUM_DATA; let start_index = set_index * NUM_DATA;
let (parity_count, data_count) =
(erasure_spec.parity_count, erasure_spec.data_count);
let mut blobs = generate_shared_test_blobs(0, data_count); let mut blobs = generate_test_blobs(0, NUM_DATA);
index_blobs( index_blobs(
&blobs, &blobs,
&Keypair::new().pubkey(), &Keypair::new().pubkey(),
@ -586,14 +675,7 @@ pub mod test {
0, 0,
); );
let mut coding_blobs = encode_shared( let mut coding_blobs = coding_generator.next(&blobs);
slot,
set_index as u64,
start_index as u64,
&blobs,
parity_count,
)
.unwrap();
blobs.drain(erasure_spec.num_data..); blobs.drain(erasure_spec.num_data..);
coding_blobs.drain(erasure_spec.num_coding..); coding_blobs.drain(erasure_spec.num_coding..);
@ -603,8 +685,6 @@ pub mod test {
set_index: set_index as u64, set_index: set_index as u64,
data: blobs, data: blobs,
coding: coding_blobs, coding: coding_blobs,
parity_count,
data_count,
} }
}) })
.collect(); .collect();
@ -631,7 +711,6 @@ pub mod test {
blocktree blocktree
.put_coding_blob_bytes_raw( .put_coding_blob_bytes_raw(
slot, slot,
erasure_set.set_index,
blob.index(), blob.index(),
&blob.data[..blob.size() + BLOB_HEADER_SIZE], &blob.data[..blob.size() + BLOB_HEADER_SIZE],
) )
@ -643,7 +722,16 @@ pub mod test {
blocktree blocktree
} }
pub fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<Blob> { // fn verify_test_blobs(offset: usize, blobs: &[SharedBlob]) -> bool {
// let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
//
// blobs.iter().enumerate().all(|(i, blob)| {
// let blob = blob.read().unwrap();
// blob.index() as usize == i + offset && blob.data() == &data[..]
// })
// }
//
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect();
let blobs: Vec<_> = (0..num_blobs) let blobs: Vec<_> = (0..num_blobs)
@ -660,26 +748,35 @@ pub mod test {
index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0); index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0);
blobs blobs
.into_iter()
.map(|shared| shared.read().unwrap().clone())
.collect()
} }
pub fn generate_shared_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> { impl Session {
let data: Vec<_> = (0..BLOB_DATA_SIZE).into_iter().map(|i| i as u8).collect(); fn reconstruct_shared_blobs(
&self,
blobs: &mut [SharedBlob],
present: &[bool],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<(Vec<Blob>, Vec<Blob>)> {
let mut locks: Vec<std::sync::RwLockWriteGuard<_>> = blobs
.iter()
.map(|shared_blob| shared_blob.write().unwrap())
.collect();
let blobs: Vec<_> = (0..num_blobs) let mut slices: Vec<_> = locks
.into_iter() .iter_mut()
.map(|_| { .enumerate()
let mut blob = Blob::default(); .map(|(i, blob)| {
blob.data_mut().copy_from_slice(&data); if i < NUM_DATA {
blob.set_size(data.len()); &mut blob.data[..size]
Arc::new(RwLock::new(blob)) } else {
}) &mut blob.data_mut()[..size]
.collect(); }
})
.collect();
index_blobs(&blobs, &Pubkey::new_rand(), offset as u64, 0, 0); self.reconstruct_blobs(&mut slices, present, size, block_start_idx, slot)
}
blobs
} }
} }

View File

@ -1,9 +1,6 @@
//! The `packet` module defines data structures and methods to pull data from the network. //! The `packet` module defines data structures and methods to pull data from the network.
use crate::{ use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
erasure::CodingHeader, use crate::result::{Error, Result};
recvmmsg::{recv_mmsg, NUM_RCVMMSGS},
result::{Error, Result},
};
use bincode; use bincode;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use serde::Serialize; use serde::Serialize;
@ -336,7 +333,7 @@ pub fn packets_to_blobs<T: Borrow<Packet>>(packets: &[T]) -> Vec<Blob> {
} }
macro_rules! range { macro_rules! range {
($prev:expr, $type:ty) => { ($prev:expr, $type:ident) => {
$prev..$prev + size_of::<$type>() $prev..$prev + size_of::<$type>()
}; };
} }
@ -346,13 +343,17 @@ const FORWARDED_RANGE: std::ops::Range<usize> = range!(SIGNATURE_RANGE.end, bool
const PARENT_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u64); const PARENT_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u64);
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64); const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const CODING_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Option<CodingHeader>); const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
const ID_RANGE: std::ops::Range<usize> = range!(CODING_RANGE.end, Pubkey);
const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32); const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64); const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
pub const BLOB_HEADER_SIZE: usize = SIZE_RANGE.end; macro_rules! align {
($x:expr, $align:expr) => {
$x + ($align - 1) & !($align - 1)
};
}
pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure
pub const SIGNABLE_START: usize = PARENT_RANGE.start; pub const SIGNABLE_START: usize = PARENT_RANGE.start;
pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2; pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2;
@ -421,14 +422,6 @@ impl Blob {
self.data[ID_RANGE].copy_from_slice(id.as_ref()) self.data[ID_RANGE].copy_from_slice(id.as_ref())
} }
pub fn set_coding_header(&mut self, header: &CodingHeader) {
bincode::serialize_into(&mut self.data[CODING_RANGE], &Some(*header)).unwrap();
}
pub fn get_coding_header(&self) -> Option<CodingHeader> {
bincode::deserialize(&self.data[CODING_RANGE]).unwrap()
}
/// Used to determine whether or not this blob should be forwarded in retransmit /// Used to determine whether or not this blob should be forwarded in retransmit
/// A bool is used here instead of a flag because this item is not intended to be signed when /// A bool is used here instead of a flag because this item is not intended to be signed when
/// blob signatures are introduced /// blob signatures are introduced
@ -475,10 +468,10 @@ impl Blob {
} }
pub fn data(&self) -> &[u8] { pub fn data(&self) -> &[u8] {
&self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] &self.data[BLOB_HEADER_SIZE..]
} }
pub fn data_mut(&mut self) -> &mut [u8] { pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE] &mut self.data[BLOB_HEADER_SIZE..]
} }
pub fn size(&self) -> usize { pub fn size(&self) -> usize {
let size = self.data_size() as usize; let size = self.data_size() as usize;

View File

@ -4,7 +4,7 @@
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet::{Blob, SharedBlob}; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::repair_service::{RepairService, RepairStrategy}; use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
@ -62,14 +62,19 @@ pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result
} }
}))?; }))?;
blocktree.put_many_coding_blobs(blobs.iter().filter_map(move |blob| { for blob in blobs {
if blob.is_coding() { // TODO: Once the original leader signature is added to the blob, make sure that
Some(&**blob) // the blob was originally generated by the expected leader for this slot
} else {
None
}
}))?;
// Insert the new blob into block tree
if blob.is_coding() {
blocktree.put_coding_blob_bytes(
blob.slot(),
blob.index(),
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)?;
}
}
Ok(()) Ok(())
} }