Add Capabilities to Signal BroadcastStage to Retransmit (#8899)

This commit is contained in:
carllin 2020-03-19 23:35:01 -07:00 committed by GitHub
parent c68e80c93b
commit dc1db33ec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 303 additions and 61 deletions

View File

@ -1,30 +1,41 @@
//! A stage to broadcast data from a leader node to validators
use self::broadcast_fake_shreds_run::BroadcastFakeShredsRun;
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::poh_recorder::WorkingBankEntry;
use crate::result::{Error, Result};
use solana_ledger::blockstore::Blockstore;
use solana_ledger::shred::Shred;
use solana_ledger::staking_utils;
use self::{
broadcast_fake_shreds_run::BroadcastFakeShredsRun,
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
};
use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
poh_recorder::WorkingBankEntry,
result::{Error, Result},
};
use crossbeam_channel::{
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
Sender as CrossbeamSender,
};
use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils};
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Instant;
pub const NUM_INSERT_THREADS: usize = 2;
use solana_runtime::bank::Bank;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
collections::HashMap,
net::UdpSocket,
sync::atomic::{AtomicBool, Ordering},
sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender},
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
mod broadcast_fake_shreds_run;
pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
pub const NUM_INSERT_THREADS: usize = 2;
pub type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
pub type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType {
ChannelDisconnected,
@ -43,6 +54,7 @@ impl BroadcastStageType {
sock: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
shred_version: u16,
@ -53,6 +65,7 @@ impl BroadcastStageType {
sock,
cluster_info,
receiver,
retransmit_slots_receiver,
exit_sender,
blockstore,
StandardBroadcastRun::new(keypair, shred_version),
@ -62,6 +75,7 @@ impl BroadcastStageType {
sock,
cluster_info,
receiver,
retransmit_slots_receiver,
exit_sender,
blockstore,
FailEntryVerificationBroadcastRun::new(keypair, shred_version),
@ -71,6 +85,7 @@ impl BroadcastStageType {
sock,
cluster_info,
receiver,
retransmit_slots_receiver,
exit_sender,
blockstore,
BroadcastFakeShredsRun::new(keypair, 0, shred_version),
@ -79,7 +94,7 @@ impl BroadcastStageType {
}
}
type TransmitShreds = (Option<Arc<HashMap<Pubkey, u64>>>, Arc<Vec<Shred>>);
pub type TransmitShreds = (Option<Arc<HashMap<Pubkey, u64>>>, Arc<Vec<Shred>>);
trait BroadcastRun {
fn run(
&mut self,
@ -135,25 +150,27 @@ impl BroadcastStage {
loop {
let res =
broadcast_stage_run.run(blockstore, receiver, socket_sender, blockstore_sender);
let res = Self::handle_error(res);
let res = Self::handle_error(res, "run");
if let Some(res) = res {
return res;
}
}
}
fn handle_error(r: Result<()>) -> Option<BroadcastStageReturnType> {
fn handle_error(r: Result<()>, name: &str) -> Option<BroadcastStageReturnType> {
if let Err(e) = r {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected)
| Error::SendError
| Error::RecvError(RecvError) => {
| Error::RecvError(RecvError)
| Error::CrossbeamRecvTimeoutError(CrossbeamRecvTimeoutError::Disconnected) => {
return Some(BroadcastStageReturnType::ChannelDisconnected);
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::RecvTimeoutError(RecvTimeoutError::Timeout)
| Error::CrossbeamRecvTimeoutError(CrossbeamRecvTimeoutError::Timeout) => (),
Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
error!("{} broadcaster error: {:?}", name, e);
}
}
}
@ -180,6 +197,7 @@ impl BroadcastStage {
socks: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
@ -189,6 +207,8 @@ impl BroadcastStage {
let (socket_sender, socket_receiver) = channel();
let (blockstore_sender, blockstore_receiver) = channel();
let bs_run = broadcast_stage_run.clone();
let socket_sender_ = socket_sender.clone();
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
@ -196,7 +216,7 @@ impl BroadcastStage {
Self::run(
&btree,
&receiver,
&socket_sender,
&socket_sender_,
&blockstore_sender,
bs_run,
)
@ -212,7 +232,7 @@ impl BroadcastStage {
.name("solana-broadcaster-transmit".to_string())
.spawn(move || loop {
let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock);
let res = Self::handle_error(res);
let res = Self::handle_error(res, "solana-broadcaster-transmit");
if let Some(res) = res {
return res;
}
@ -229,7 +249,7 @@ impl BroadcastStage {
.name("solana-broadcaster-record".to_string())
.spawn(move || loop {
let res = bs_record.record(&blockstore_receiver, &btree);
let res = Self::handle_error(res);
let res = Self::handle_error(res, "solana-broadcaster-record");
if let Some(res) = res {
return res;
}
@ -238,9 +258,68 @@ impl BroadcastStage {
thread_hdls.push(t);
}
let blockstore = blockstore.clone();
let retransmit_thread = Builder::new()
.name("solana-broadcaster-retransmit".to_string())
.spawn(move || loop {
if let Some(res) = Self::handle_error(
Self::check_retransmit_signals(
&blockstore,
&retransmit_slots_receiver,
&socket_sender,
),
"solana-broadcaster-retransmit-check_retransmit_signals",
) {
return res;
}
})
.unwrap();
thread_hdls.push(retransmit_thread);
Self { thread_hdls }
}
fn check_retransmit_signals(
blockstore: &Blockstore,
retransmit_slots_receiver: &RetransmitSlotsReceiver,
socket_sender: &Sender<TransmitShreds>,
) -> Result<()> {
let timer = Duration::from_millis(100);
// Check for a retransmit signal
let mut retransmit_slots = retransmit_slots_receiver.recv_timeout(timer)?;
while let Ok(new_retransmit_slots) = retransmit_slots_receiver.try_recv() {
retransmit_slots.extend(new_retransmit_slots);
}
for (_, bank) in retransmit_slots.iter() {
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let stakes = stakes.map(Arc::new);
let data_shreds = Arc::new(
blockstore
.get_data_shreds_for_slot(bank.slot(), 0)
.expect("My own shreds must be reconstructable"),
);
if !data_shreds.is_empty() {
socket_sender.send((stakes.clone(), data_shreds))?;
}
let coding_shreds = Arc::new(
blockstore
.get_coding_shreds_for_slot(bank.slot(), 0)
.expect("My own shreds must be reconstructable"),
);
if !coding_shreds.is_empty() {
socket_sender.send((stakes.clone(), coding_shreds))?;
}
}
Ok(())
}
pub fn join(self) -> thread::Result<BroadcastStageReturnType> {
for thread_hdl in self.thread_hdls.into_iter() {
let _ = thread_hdl.join();
@ -250,22 +329,139 @@ impl BroadcastStage {
}
#[cfg(test)]
mod test {
pub mod test {
use super::*;
use crate::cluster_info::{ClusterInfo, Node};
use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::entry::create_ticks;
use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path};
use crate::{
cluster_info::{ClusterInfo, Node},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use crossbeam_channel::unbounded;
use solana_ledger::{
blockstore::{make_slot_entries, Blockstore},
entry::create_ticks,
get_tmp_ledger_path,
shred::{max_ticks_per_n_shreds, Shredder, RECOMMENDED_FEC_RATE},
};
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signer};
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;
use solana_sdk::{
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
};
use std::{
path::Path,
sync::atomic::AtomicBool,
sync::mpsc::channel,
sync::{Arc, RwLock},
thread::sleep,
};
pub fn make_transmit_shreds(
slot: Slot,
num: u64,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
) -> (
Vec<Shred>,
Vec<Shred>,
Vec<TransmitShreds>,
Vec<TransmitShreds>,
) {
let num_entries = max_ticks_per_n_shreds(num);
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
let keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0)
.expect("Expected to create a new shredder");
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..]);
(
data_shreds.clone(),
coding_shreds.clone(),
data_shreds
.into_iter()
.map(|s| (stakes.clone(), Arc::new(vec![s])))
.collect(),
coding_shreds
.into_iter()
.map(|s| (stakes.clone(), Arc::new(vec![s])))
.collect(),
)
}
fn check_all_shreds_received(
transmit_receiver: &Receiver<TransmitShreds>,
mut data_index: u64,
mut coding_index: u64,
num_expected_data_shreds: u64,
num_expected_coding_shreds: u64,
) {
while let Ok(new_retransmit_slots) = transmit_receiver.try_recv() {
if new_retransmit_slots.1[0].is_data() {
for data_shred in new_retransmit_slots.1.iter() {
assert_eq!(data_shred.index() as u64, data_index);
data_index += 1;
}
} else {
assert_eq!(new_retransmit_slots.1[0].index() as u64, coding_index);
for coding_shred in new_retransmit_slots.1.iter() {
assert_eq!(coding_shred.index() as u64, coding_index);
coding_index += 1;
}
}
}
assert_eq!(num_expected_data_shreds, data_index);
assert_eq!(num_expected_coding_shreds, coding_index);
}
#[test]
fn test_duplicate_retransmit_signal() {
// Setup
let ledger_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let (transmit_sender, transmit_receiver) = channel();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
let bank0 = Arc::new(Bank::new(&genesis_config));
// Make some shreds
let updated_slot = 0;
let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) =
make_transmit_shreds(updated_slot, 10, None);
let num_data_shreds = all_data_shreds.len();
let num_coding_shreds = all_coding_shreds.len();
assert!(num_data_shreds >= 10);
// Insert all the shreds
blockstore
.insert_shreds(all_data_shreds, None, true)
.unwrap();
blockstore
.insert_shreds(all_coding_shreds, None, true)
.unwrap();
// Insert duplicate retransmit signal, blocks should
// only be retransmitted once
retransmit_slots_sender
.send(vec![(updated_slot, bank0.clone())].into_iter().collect())
.unwrap();
retransmit_slots_sender
.send(vec![(updated_slot, bank0.clone())].into_iter().collect())
.unwrap();
BroadcastStage::check_retransmit_signals(
&blockstore,
&retransmit_slots_receiver,
&transmit_sender,
)
.unwrap();
// Check all the data shreds were received only once
check_all_shreds_received(
&transmit_receiver,
0,
0,
num_data_shreds as u64,
num_coding_shreds as u64,
);
}
struct MockBroadcastStage {
blockstore: Arc<Blockstore>,
@ -277,6 +473,7 @@ mod test {
leader_pubkey: &Pubkey,
ledger_path: &Path,
entry_receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver,
) -> MockBroadcastStage {
// Make the database ledger
let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap());
@ -304,6 +501,7 @@ mod test {
leader_info.sockets.broadcast,
cluster_info,
entry_receiver,
retransmit_slots_receiver,
&exit_sender,
&blockstore,
StandardBroadcastRun::new(leader_keypair, 0),
@ -326,10 +524,12 @@ mod test {
let leader_keypair = Keypair::new();
let (entry_sender, entry_receiver) = channel();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let broadcast_service = setup_dummy_broadcast_service(
&leader_keypair.pubkey(),
&ledger_path,
entry_receiver,
retransmit_slots_receiver,
);
let start_tick_height;
let max_tick_height;
@ -348,6 +548,7 @@ mod test {
.expect("Expect successful send to broadcast service");
}
}
sleep(Duration::from_millis(2000));
trace!(
@ -364,6 +565,7 @@ mod test {
assert_eq!(entries.len(), max_tick_height as usize);
drop(entry_sender);
drop(retransmit_slots_sender);
broadcast_service
.broadcast_service
.join()

View File

@ -1,11 +1,11 @@
use super::broadcast_utils::{self, ReceiveResults};
use super::*;
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use solana_ledger::entry::Entry;
use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::timing::duration_as_us;
use solana_ledger::{
entry::Entry,
shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK},
};
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use std::collections::HashMap;
use std::time::Duration;
@ -212,7 +212,8 @@ impl StandardBroadcastRun {
blockstore_sender.send(data_shreds.clone())?;
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]);
let coding_shreds = Arc::new(coding_shreds);
socket_sender.send((stakes, coding_shreds))?;
socket_sender.send((stakes, coding_shreds.clone()))?;
blockstore_sender.send(coding_shreds)?;
self.update_broadcast_stats(BroadcastStats {
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
receive_elapsed: duration_as_us(&receive_elapsed),
@ -360,7 +361,6 @@ mod test {
};
use solana_runtime::bank::Bank;
use solana_sdk::{
clock::Slot,
genesis_config::GenesisConfig,
signature::{Keypair, Signer},
};

View File

@ -1,6 +1,7 @@
//! The `replay_stage` replays transactions broadcast by the leader.
use crate::{
broadcast_stage::RetransmitSlotsSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData},
@ -47,6 +48,7 @@ use std::{
};
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
pub(crate) type ProgressMap = HashMap<Slot, ForkProgress>;
#[derive(PartialEq, Debug)]
@ -179,6 +181,7 @@ impl ReplayStage {
ledger_signal_receiver: Receiver<bool>,
poh_recorder: Arc<Mutex<PohRecorder>>,
_vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
) -> (Self, Receiver<Vec<Arc<Bank>>>) {
let ReplayStageConfig {
my_pubkey,
@ -208,6 +211,7 @@ impl ReplayStage {
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _retransmit_slots_sender = retransmit_slots_sender;
let verify_recyclers = VerifyRecyclers::default();
let _exit = Finalizer::new(exit.clone());
let mut progress = HashMap::new();

View File

@ -3,7 +3,7 @@
use crate::{
banking_stage::BankingStage,
broadcast_stage::{BroadcastStage, BroadcastStageType},
broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver},
cluster_info::ClusterInfo,
cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker},
fetch_stage::FetchStage,
@ -39,6 +39,7 @@ impl Tpu {
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver,
transactions_sockets: Vec<UdpSocket>,
tpu_forwards_sockets: Vec<UdpSocket>,
broadcast_sockets: Vec<UdpSocket>,
@ -92,6 +93,7 @@ impl Tpu {
broadcast_sockets,
cluster_info.clone(),
entry_receiver,
retransmit_slots_receiver,
&exit,
blockstore,
shred_version,

View File

@ -4,6 +4,7 @@
use crate::{
accounts_hash_verifier::AccountsHashVerifier,
blockstream_service::BlockstreamService,
broadcast_stage::RetransmitSlotsSender,
cluster_info::ClusterInfo,
cluster_info_vote_listener::VoteTracker,
commitment::BlockCommitmentCache,
@ -100,6 +101,7 @@ impl Tvu {
rewards_recorder_sender: Option<RewardsRecorderSender>,
snapshot_package_sender: Option<SnapshotPackageSender>,
vote_tracker: Arc<VoteTracker>,
retransmit_slots_sender: RetransmitSlotsSender,
tvu_config: TvuConfig,
) -> Self {
let keypair: Arc<Keypair> = cluster_info
@ -196,6 +198,7 @@ impl Tvu {
ledger_signal_receiver,
poh_recorder.clone(),
vote_tracker,
retransmit_slots_sender,
);
let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket {
@ -298,6 +301,7 @@ pub mod tests {
let storage_keypair = Arc::new(Keypair::new());
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let tvu = Tvu::new(
&voting_keypair.pubkey(),
Some(Arc::new(voting_keypair)),
@ -327,6 +331,7 @@ pub mod tests {
None,
None,
Arc::new(VoteTracker::new(&bank)),
retransmit_slots_sender,
TvuConfig::default(),
);
exit.store(true, Ordering::Relaxed);

View File

@ -380,6 +380,7 @@ impl Validator {
let vote_tracker = Arc::new({ VoteTracker::new(bank_forks.read().unwrap().root_bank()) });
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
let tvu = Tvu::new(
vote_account,
if config.voting_disabled {
@ -430,6 +431,7 @@ impl Validator {
rewards_recorder_sender,
snapshot_package_sender,
vote_tracker.clone(),
retransmit_slots_sender,
TvuConfig {
max_ledger_slots: config.max_ledger_slots,
sigverify_disabled: config.dev_sigverify_disabled,
@ -449,6 +451,7 @@ impl Validator {
&cluster_info,
&poh_recorder,
entry_receiver,
retransmit_slots_receiver,
node.sockets.tpu,
node.sockets.tpu_forwards,
node.sockets.broadcast,

View File

@ -13,7 +13,7 @@ use crate::{
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
rooted_slot_iterator::RootedSlotIterator,
shred::{Shred, Shredder},
shred::{Result as ShredResult, Shred, Shredder},
};
use bincode::deserialize;
use log::*;
@ -452,20 +452,24 @@ impl Blockstore {
pub fn slot_data_iterator<'a>(
&'a self,
slot: Slot,
index: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + 'a> {
let slot_iterator = self
.db
.iter::<cf::ShredData>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;
let slot_iterator = self.db.iter::<cf::ShredData>(IteratorMode::From(
(slot, index),
IteratorDirection::Forward,
))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
pub fn slot_coding_iterator<'a>(
&'a self,
slot: Slot,
index: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + 'a> {
let slot_iterator = self
.db
.iter::<cf::ShredCode>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;
let slot_iterator = self.db.iter::<cf::ShredCode>(IteratorMode::From(
(slot, index),
IteratorDirection::Forward,
))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
@ -1078,6 +1082,17 @@ impl Blockstore {
self.data_shred_cf.get_bytes((slot, index))
}
pub fn get_data_shreds_for_slot(
&self,
slot: Slot,
start_index: u64,
) -> ShredResult<Vec<Shred>> {
self.slot_data_iterator(slot, start_index)
.expect("blockstore couldn't fetch iterator")
.map(|data| Shred::new_from_serialized_shred(data.1.to_vec()))
.collect()
}
pub fn get_data_shreds(
&self,
slot: Slot,
@ -1127,6 +1142,17 @@ impl Blockstore {
self.code_shred_cf.get_bytes((slot, index))
}
pub fn get_coding_shreds_for_slot(
&self,
slot: Slot,
start_index: u64,
) -> ShredResult<Vec<Shred>> {
self.slot_coding_iterator(slot, start_index)
.expect("blockstore couldn't fetch iterator")
.map(|code| Shred::new_from_serialized_shred(code.1.to_vec()))
.collect()
}
// Only used by tests
#[allow(clippy::too_many_arguments)]
pub fn write_entries(
@ -4385,12 +4411,12 @@ pub mod tests {
}
// Slot doesnt exist, iterator should be empty
let shred_iter = blockstore.slot_data_iterator(5).unwrap();
let shred_iter = blockstore.slot_data_iterator(5, 0).unwrap();
let result: Vec<_> = shred_iter.collect();
assert_eq!(result, vec![]);
// Test that the iterator for slot 8 contains what was inserted earlier
let shred_iter = blockstore.slot_data_iterator(8).unwrap();
let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap();
let result: Vec<Shred> = shred_iter
.filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok())
.collect();
@ -5451,7 +5477,7 @@ pub mod tests {
let index = blockstore.get_index(slot).unwrap().unwrap();
// Test the set of data shreds in the index and in the data column
// family are the same
let data_iter = blockstore.slot_data_iterator(slot).unwrap();
let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap();
let mut num_data = 0;
for ((slot, index), _) in data_iter {
num_data += 1;
@ -5464,7 +5490,7 @@ pub mod tests {
// Test the set of coding shreds in the index and in the coding column
// family are the same
let coding_iter = blockstore.slot_coding_iterator(slot).unwrap();
let coding_iter = blockstore.slot_coding_iterator(slot, 0).unwrap();
let mut num_coding = 0;
for ((slot, index), _) in coding_iter {
num_coding += 1;