Implement new Index Column (#4827)

* Implement new Index Column

* Correct slicing of blobs

* Mark coding blobs as coding when they're recovered

* Prevent broadcast stages from mixing coding and data blobs in blocktree

* Mark recovered blobs as present in the index

* Fix indexing error in recovery

* Fix broken tests, and some bug fixes

* increase min stack size for coverage runs
This commit is contained in:
Mark E. Sinclair 2019-07-10 13:08:17 -05:00 committed by Pankaj Garg
parent b1a678b2db
commit a383ea532f
13 changed files with 888 additions and 586 deletions

File diff suppressed because it is too large Load Diff

View File

@ -39,6 +39,10 @@ pub mod columns {
#[derive(Debug)]
/// The root column
pub struct Root;
#[derive(Debug)]
/// The index column
pub struct Index;
}
pub trait Backend: Sized + Send + Sync {

View File

@ -100,6 +100,25 @@ impl Column<Kvs> for cf::Data {
}
}
impl Column<Kvs> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl Column<Kvs> for cf::DeadSlots {
const NAME: &'static str = super::DEAD_SLOTS;
type Index = u64;

View File

@ -1,6 +1,6 @@
use crate::erasure::{NUM_CODING, NUM_DATA};
use solana_metrics::datapoint;
use std::borrow::Borrow;
use std::{collections::BTreeSet, ops::RangeBounds};
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
// The Meta column family
@ -27,6 +27,116 @@ pub struct SlotMeta {
pub is_connected: bool,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Index recording presence/absence of blobs
pub struct Index {
pub slot: u64,
data: DataIndex,
coding: CodingIndex,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct DataIndex {
/// Map representing presence/absence of data blobs
index: BTreeSet<u64>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
/// Erasure coding information
pub struct CodingIndex {
/// Map from set index, to hashmap from blob index to presence bool
index: BTreeSet<u64>,
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
/// Which erasure set in the slot this is
pub set_index: u64,
/// Size of shards in this erasure set
pub size: usize,
}
#[derive(Debug, PartialEq)]
pub enum ErasureMetaStatus {
CanRecover,
DataFull,
StillNeed(usize),
}
impl Index {
pub(in crate::blocktree) fn new(slot: u64) -> Self {
Index {
slot,
data: DataIndex::default(),
coding: CodingIndex::default(),
}
}
pub fn data(&self) -> &DataIndex {
&self.data
}
pub fn coding(&self) -> &CodingIndex {
&self.coding
}
pub fn data_mut(&mut self) -> &mut DataIndex {
&mut self.data
}
pub fn coding_mut(&mut self) -> &mut CodingIndex {
&mut self.coding
}
}
/// TODO: Mark: Change this when coding
impl CodingIndex {
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
}
pub fn is_present(&self, index: u64) -> bool {
self.index.contains(&index)
}
pub fn set_present(&mut self, index: u64, presence: bool) {
if presence {
self.index.insert(index);
} else {
self.index.remove(&index);
}
}
pub fn set_many_present(&mut self, presence: impl IntoIterator<Item = (u64, bool)>) {
for (idx, present) in presence.into_iter() {
self.set_present(idx, present);
}
}
}
impl DataIndex {
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
}
pub fn is_present(&self, index: u64) -> bool {
self.index.contains(&index)
}
pub fn set_present(&mut self, index: u64, presence: bool) {
if presence {
self.index.insert(index);
} else {
self.index.remove(&index);
}
}
pub fn set_many_present(&mut self, presence: impl IntoIterator<Item = (u64, bool)>) {
for (idx, present) in presence.into_iter() {
self.set_present(idx, present);
}
}
}
impl SlotMeta {
pub fn is_full(&self) -> bool {
// last_index is std::u64::MAX when it has no information about how
@ -72,62 +182,30 @@ impl SlotMeta {
}
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
/// Erasure coding information
pub struct ErasureMeta {
/// Which erasure set in the slot this is
pub set_index: u64,
/// Size of shards in this erasure set
size: usize,
/// Bitfield representing presence/absence of data blobs
data: u64,
/// Bitfield representing presence/absence of coding blobs
coding: u64,
}
#[derive(Debug, PartialEq)]
pub enum ErasureMetaStatus {
CanRecover,
DataFull,
StillNeed(usize),
}
impl ErasureMeta {
pub fn new(set_index: u64) -> ErasureMeta {
ErasureMeta {
set_index,
size: 0,
data: 0,
coding: 0,
}
ErasureMeta { set_index, size: 0 }
}
pub fn status(&self) -> ErasureMetaStatus {
let (data_missing, coding_missing) =
(NUM_DATA - self.num_data(), NUM_CODING - self.num_coding());
if data_missing > 0 && data_missing + coding_missing <= NUM_CODING {
//assert!(self.size != 0);
ErasureMetaStatus::CanRecover
pub fn status(&self, index: &Index) -> ErasureMetaStatus {
use ErasureMetaStatus::*;
let start_idx = self.start_index();
let (data_end_idx, coding_end_idx) = self.end_indexes();
let num_coding = index.coding().present_in_bounds(start_idx..coding_end_idx);
let num_data = index.data().present_in_bounds(start_idx..data_end_idx);
let (data_missing, coding_missing) = (NUM_DATA - num_data, NUM_CODING - num_coding);
let total_missing = data_missing + coding_missing;
if data_missing > 0 && total_missing <= NUM_CODING {
CanRecover
} else if data_missing == 0 {
ErasureMetaStatus::DataFull
DataFull
} else {
ErasureMetaStatus::StillNeed(data_missing + coding_missing - NUM_CODING)
}
}
pub fn num_coding(&self) -> usize {
self.coding.count_ones() as usize
}
pub fn num_data(&self) -> usize {
self.data.count_ones() as usize
}
pub fn is_coding_present(&self, index: u64) -> bool {
if let Some(position) = self.data_index_in_set(index) {
self.coding & (1 << position) != 0
} else {
false
StillNeed(total_missing - NUM_CODING)
}
}
@ -139,72 +217,10 @@ impl ErasureMeta {
self.size
}
pub fn set_coding_present(&mut self, index: u64, present: bool) {
if let Some(position) = self.data_index_in_set(index) {
if present {
self.coding |= 1 << position;
} else {
self.coding &= !(1 << position);
}
}
}
pub fn is_data_present(&self, index: u64) -> bool {
if let Some(position) = self.data_index_in_set(index) {
self.data & (1 << position) != 0
} else {
false
}
}
pub fn set_data_present(&mut self, index: u64, present: bool) {
if let Some(position) = self.data_index_in_set(index) {
if present {
self.data |= 1 << position;
} else {
self.data &= !(1 << position);
}
}
}
pub fn set_data_multi<I, Idx>(&mut self, indexes: I, present: bool)
where
I: IntoIterator<Item = Idx>,
Idx: Borrow<u64>,
{
for index in indexes.into_iter() {
self.set_data_present(*index.borrow(), present);
}
}
pub fn set_coding_multi<I, Idx>(&mut self, indexes: I, present: bool)
where
I: IntoIterator<Item = Idx>,
Idx: Borrow<u64>,
{
for index in indexes.into_iter() {
self.set_coding_present(*index.borrow(), present);
}
}
pub fn set_index_for(index: u64) -> u64 {
index / NUM_DATA as u64
}
pub fn data_index_in_set(&self, index: u64) -> Option<u64> {
let set_index = Self::set_index_for(index);
if set_index == self.set_index {
Some(index - self.start_index())
} else {
None
}
}
pub fn coding_index_in_set(&self, index: u64) -> Option<u64> {
self.data_index_in_set(index).map(|i| i + NUM_DATA as u64)
}
pub fn start_index(&self) -> u64 {
self.set_index * NUM_DATA as u64
}
@ -216,130 +232,59 @@ impl ErasureMeta {
}
}
#[test]
fn test_meta_indexes() {
use rand::{thread_rng, Rng};
// to avoid casts everywhere
const NUM_DATA: u64 = crate::erasure::NUM_DATA as u64;
let mut rng = thread_rng();
for _ in 0..100 {
let set_index = rng.gen_range(0, 1_000);
let blob_index = (set_index * NUM_DATA) + rng.gen_range(0, NUM_DATA);
assert_eq!(set_index, ErasureMeta::set_index_for(blob_index));
let e_meta = ErasureMeta::new(set_index);
assert_eq!(e_meta.start_index(), set_index * NUM_DATA);
let (data_end_idx, coding_end_idx) = e_meta.end_indexes();
assert_eq!(data_end_idx, (set_index + 1) * NUM_DATA);
assert_eq!(coding_end_idx, set_index * NUM_DATA + NUM_CODING as u64);
}
let mut e_meta = ErasureMeta::new(0);
assert_eq!(e_meta.data_index_in_set(0), Some(0));
assert_eq!(e_meta.data_index_in_set(NUM_DATA / 2), Some(NUM_DATA / 2));
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), Some(NUM_DATA - 1));
assert_eq!(e_meta.data_index_in_set(NUM_DATA), None);
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
e_meta.set_index = 1;
assert_eq!(e_meta.data_index_in_set(0), None);
assert_eq!(e_meta.data_index_in_set(NUM_DATA - 1), None);
assert_eq!(e_meta.data_index_in_set(NUM_DATA), Some(0));
assert_eq!(
e_meta.data_index_in_set(NUM_DATA * 2 - 1),
Some(NUM_DATA - 1)
);
assert_eq!(e_meta.data_index_in_set(std::u64::MAX), None);
}
#[test]
fn test_meta_coding_present() {
let mut e_meta = ErasureMeta::default();
e_meta.set_coding_multi(0..NUM_CODING as u64, true);
for i in 0..NUM_CODING as u64 {
assert_eq!(e_meta.is_coding_present(i), true);
}
for i in NUM_CODING as u64..NUM_DATA as u64 {
assert_eq!(e_meta.is_coding_present(i), false);
}
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 17) as u64);
let start_idx = e_meta.start_index();
e_meta.set_coding_multi(start_idx..start_idx + NUM_CODING as u64, true);
for i in start_idx..start_idx + NUM_CODING as u64 {
e_meta.set_coding_present(i, true);
assert_eq!(e_meta.is_coding_present(i), true);
}
for i in start_idx + NUM_CODING as u64..start_idx + NUM_DATA as u64 {
assert_eq!(e_meta.is_coding_present(i), false);
}
}
#[test]
fn test_erasure_meta_status() {
#[cfg(test)]
mod test {
use super::*;
use rand::{seq::SliceRandom, thread_rng};
// Local constansts just used to avoid repetitive casts
const N_DATA: u64 = crate::erasure::NUM_DATA as u64;
const N_CODING: u64 = crate::erasure::NUM_CODING as u64;
use std::iter::repeat;
let mut e_meta = ErasureMeta::default();
let mut rng = thread_rng();
let data_indexes: Vec<u64> = (0..N_DATA).collect();
let coding_indexes: Vec<u64> = (0..N_CODING).collect();
#[test]
fn test_erasure_meta_status() {
use ErasureMetaStatus::*;
assert_eq!(e_meta.status(), ErasureMetaStatus::StillNeed(NUM_DATA));
let set_index = 0;
e_meta.set_data_multi(0..N_DATA, true);
let mut e_meta = ErasureMeta::new(set_index);
let mut rng = thread_rng();
let mut index = Index::new(0);
e_meta.size = 1;
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
let data_indexes = 0..NUM_DATA as u64;
let coding_indexes = 0..NUM_CODING as u64;
e_meta.size = 1;
e_meta.set_coding_multi(0..N_CODING, true);
assert_eq!(e_meta.status(&index), StillNeed(NUM_DATA));
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
index
.data_mut()
.set_many_present(data_indexes.clone().zip(repeat(true)));
for &idx in data_indexes.choose_multiple(&mut rng, NUM_CODING) {
e_meta.set_data_present(idx, false);
assert_eq!(e_meta.status(&index), DataFull);
assert_eq!(e_meta.status(), ErasureMetaStatus::CanRecover);
}
index
.coding_mut()
.set_many_present(coding_indexes.clone().zip(repeat(true)));
e_meta.set_data_multi(0..N_DATA, true);
for &idx in data_indexes
.clone()
.collect::<Vec<_>>()
.choose_multiple(&mut rng, NUM_DATA)
{
index.data_mut().set_present(idx, false);
for &idx in coding_indexes.choose_multiple(&mut rng, NUM_CODING) {
e_meta.set_coding_present(idx, false);
assert_eq!(e_meta.status(&index), CanRecover);
}
assert_eq!(e_meta.status(), ErasureMetaStatus::DataFull);
}
}
#[test]
fn test_meta_data_present() {
let mut e_meta = ErasureMeta::default();
e_meta.set_data_multi(0..NUM_DATA as u64, true);
for i in 0..NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), true);
}
for i in NUM_DATA as u64..2 * NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), false);
}
e_meta.set_index = ErasureMeta::set_index_for((NUM_DATA * 23) as u64);
let start_idx = e_meta.start_index();
e_meta.set_data_multi(start_idx..start_idx + NUM_DATA as u64, true);
for i in start_idx..start_idx + NUM_DATA as u64 {
assert_eq!(e_meta.is_data_present(i), true);
}
for i in start_idx - NUM_DATA as u64..start_idx {
assert_eq!(e_meta.is_data_present(i), false);
index
.data_mut()
.set_many_present(data_indexes.zip(repeat(true)));
for &idx in coding_indexes
.collect::<Vec<_>>()
.choose_multiple(&mut rng, NUM_CODING)
{
index.coding_mut().set_present(idx, false);
assert_eq!(e_meta.status(&index), DataFull);
}
}
}

View File

@ -30,7 +30,9 @@ impl Backend for Rocks {
type Error = rocksdb::Error;
fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
use crate::blocktree::db::columns::{
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta,
};
fs::create_dir_all(&path)?;
@ -40,12 +42,14 @@ impl Backend for Rocks {
// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
let cfs = vec![
meta_cf_descriptor,
@ -55,6 +59,7 @@ impl Backend for Rocks {
erasure_meta_cf_descriptor,
orphans_cf_descriptor,
root_cf_descriptor,
index_cf_descriptor,
];
// Open the database
@ -64,13 +69,16 @@ impl Backend for Rocks {
}
fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
use crate::blocktree::db::columns::{
Coding, Data, DeadSlots, ErasureMeta, Index, Orphans, Root, SlotMeta,
};
vec![
Coding::NAME,
ErasureMeta::NAME,
DeadSlots::NAME,
Data::NAME,
Index::NAME,
Orphans::NAME,
Root::NAME,
SlotMeta::NAME,
@ -164,6 +172,25 @@ impl Column<Rocks> for cf::Data {
}
}
impl Column<Rocks> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
}
impl TypedColumn<Rocks> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl Column<Rocks> for cf::DeadSlots {
const NAME: &'static str = super::DEAD_SLOTS_CF;
type Index = u64;

View File

@ -65,7 +65,8 @@ impl BroadcastRun for BroadcastBadBlobSizes {
w_b.meta.size = real_size;
}
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());

View File

@ -73,7 +73,8 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
self.last_blockhash = Hash::default();
}
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
// Set the forwarded flag to true, so that the blobs won't be forwarded to peers
data_blobs

View File

@ -52,7 +52,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
&mut broadcast.coding_generator,
);
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());

View File

@ -82,7 +82,9 @@ impl BroadcastRun for StandardBroadcastRun {
&mut broadcast.coding_generator,
);
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
blocktree.write_shared_blobs(data_blobs.iter())?;
blocktree.put_shared_coding_blobs(coding_blobs.iter())?;
let to_blobs_elapsed = to_blobs_start.elapsed();
// 3) Start broadcast step

View File

@ -138,23 +138,25 @@ impl Session {
if n < NUM_DATA {
let mut blob = Blob::new(&blocks[n]);
blob.meta.size = blob.data_size() as usize;
data_size = blob.data_size() as usize - BLOB_HEADER_SIZE;
data_size = blob.data_size() as usize;
idx = n as u64 + block_start_idx;
first_byte = blob.data[0];
blob.set_size(data_size);
recovered_data.push(blob);
} else {
let mut blob = Blob::default();
blob.data_mut()[..size].copy_from_slice(&blocks[n]);
blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size].copy_from_slice(&blocks[n]);
blob.meta.size = size;
data_size = size;
idx = (n as u64 + block_start_idx) - NUM_DATA as u64;
idx = n as u64 + block_start_idx - NUM_DATA as u64;
first_byte = blob.data[0];
blob.set_slot(slot);
blob.set_index(idx);
blob.set_size(data_size);
blob.set_coding();
recovered_coding.push(blob);
}
@ -240,7 +242,7 @@ impl CodingGenerator {
if {
let mut coding_ptrs: Vec<_> = coding_blobs
.iter_mut()
.map(|blob| &mut blob.data_mut()[..max_data_size])
.map(|blob| &mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + max_data_size])
.collect();
self.session.encode(&data_ptrs, coding_ptrs.as_mut_slice())
@ -427,7 +429,7 @@ pub mod test {
}
#[test]
fn test_erasure_generate_coding() {
fn test_generate_coding() {
solana_logger::setup();
// trivial case
@ -449,10 +451,10 @@ pub mod test {
assert_eq!(coding_blobs.len(), NUM_CODING);
for j in 0..NUM_CODING {
assert_eq!(
coding_blobs[j].read().unwrap().index(),
((i / NUM_DATA) * NUM_DATA + j) as u64
);
let coding_blob = coding_blobs[j].read().unwrap();
//assert_eq!(coding_blob.index(), (i * NUM_DATA + j % NUM_CODING) as u64);
assert!(coding_blob.is_coding());
}
test_toss_and_recover(
&coding_generator.session,
@ -654,6 +656,8 @@ pub mod test {
S: Borrow<SlotSpec>,
{
let mut coding_generator = CodingGenerator::default();
let keypair = Keypair::new();
let bytes = keypair.to_bytes();
specs.into_iter().map(move |spec| {
let spec = spec.borrow();
@ -666,14 +670,14 @@ pub mod test {
let set_index = erasure_spec.set_index as usize;
let start_index = set_index * NUM_DATA;
let mut blobs = generate_test_blobs(0, NUM_DATA);
index_blobs(
&blobs,
&Keypair::new().pubkey(),
start_index as u64,
slot,
0,
);
let mut blobs = generate_test_blobs(start_index, NUM_DATA);
let keypair = Keypair::from_bytes(&bytes).unwrap();
index_blobs(&blobs, &keypair.pubkey(), start_index as u64, slot, 0);
// Signing has to be deferred until all data/header fields are set correctly
blobs.iter().for_each(|blob| {
blob.write().unwrap().sign(&keypair);
});
let mut coding_blobs = coding_generator.next(&blobs);
@ -738,9 +742,8 @@ pub mod test {
.into_iter()
.map(|_| {
let mut blob = Blob::default();
blob.data_mut()[..data.len()].copy_from_slice(&data);
blob.set_size(data.len());
blob.sign(&Keypair::new());
blob.data_mut()[..].copy_from_slice(&data);
blob.set_size(BLOB_DATA_SIZE);
Arc::new(RwLock::new(blob))
})
.collect();
@ -771,7 +774,7 @@ pub mod test {
if i < NUM_DATA {
&mut blob.data[..size]
} else {
&mut blob.data_mut()[..size]
&mut blob.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + size]
}
})
.collect();

View File

@ -515,10 +515,10 @@ impl Blob {
}
pub fn data(&self) -> &[u8] {
&self.data[BLOB_HEADER_SIZE..]
&self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE]
}
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data[BLOB_HEADER_SIZE..]
&mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE]
}
pub fn size(&self) -> usize {
let size = self.data_size() as usize;

View File

@ -4,7 +4,7 @@
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::packet::{Blob, SharedBlob};
use crate::repair_service::{RepairService, RepairStrategy};
use crate::result::{Error, Result};
use crate::service::Service;
@ -28,11 +28,12 @@ pub const NUM_THREADS: u32 = 10;
fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
for blob in blobs {
let mut blob_guard = blob.write().unwrap();
// Don't add blobs generated by this node to the retransmit queue
if blob.read().unwrap().id() != *id {
let mut w_blob = blob.write().unwrap();
w_blob.meta.forward = w_blob.should_forward();
w_blob.set_forwarded(false);
if blob_guard.id() != *id && !blob_guard.is_coding() {
//let mut w_blob = blob.write().unwrap();
blob_guard.meta.forward = blob_guard.should_forward();
blob_guard.set_forwarded(false);
retransmit_queue.push(blob.clone());
}
}
@ -52,29 +53,17 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey)
/// Process a blob: Add blob to the ledger window.
pub fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()> {
// make an iterator for insert_data_blobs()
let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect();
//let blobs: Vec<_> = blobs.iter().map(move |blob| blob.read().unwrap()).collect();
blocktree.insert_data_blobs(blobs.iter().filter_map(|blob| {
if !blob.is_coding() {
Some(&(**blob))
} else {
None
}
}))?;
blocktree.write_shared_blobs(
blobs
.iter()
.filter(|blob| !blob.read().unwrap().is_coding()),
)?;
for blob in blobs {
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
blocktree
.put_shared_coding_blobs(blobs.iter().filter(|blob| blob.read().unwrap().is_coding()))?;
// Insert the new blob into block tree
if blob.is_coding() {
blocktree.put_coding_blob_bytes(
blob.slot(),
blob.index(),
&blob.data[..BLOB_HEADER_SIZE + blob.size()],
)?;
}
}
Ok(())
}
@ -215,6 +204,8 @@ impl WindowService {
let bank_forks = bank_forks.clone();
let t_window = Builder::new()
.name("solana-window".to_string())
// TODO: Mark: Why is it overflowing
.stack_size(8 * 1024 * 1024)
.spawn(move || {
let _exit = Finalizer::new(exit.clone());
let id = cluster_info.read().unwrap().id();

View File

@ -27,6 +27,7 @@ coverageFlags+=("-Coverflow-checks=off") # Disable overflow checks, which create
export RUSTFLAGS="${coverageFlags[*]}"
export CARGO_INCREMENTAL=0
export RUST_BACKTRACE=1
export RUST_MIN_STACK=8388608
echo "--- remove old coverage results"
if [[ -d target/cov ]]; then