diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 26eee54194..dc9752863b 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,30 +1,41 @@ //! A stage to broadcast data from a leader node to validators -use self::broadcast_fake_shreds_run::BroadcastFakeShredsRun; -use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun; -use self::standard_broadcast_run::StandardBroadcastRun; -use crate::cluster_info::{ClusterInfo, ClusterInfoError}; -use crate::poh_recorder::WorkingBankEntry; -use crate::result::{Error, Result}; -use solana_ledger::blockstore::Blockstore; -use solana_ledger::shred::Shred; -use solana_ledger::staking_utils; +use self::{ + broadcast_fake_shreds_run::BroadcastFakeShredsRun, + fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, + standard_broadcast_run::StandardBroadcastRun, +}; +use crate::{ + cluster_info::{ClusterInfo, ClusterInfoError}, + poh_recorder::WorkingBankEntry, + result::{Error, Result}, +}; +use crossbeam_channel::{ + Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, + Sender as CrossbeamSender, +}; +use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils}; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; -use solana_sdk::pubkey::Pubkey; -use std::collections::HashMap; -use std::net::UdpSocket; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread::{self, Builder, JoinHandle}; -use std::time::Instant; - -pub const NUM_INSERT_THREADS: usize = 2; +use solana_runtime::bank::Bank; +use solana_sdk::{clock::Slot, pubkey::Pubkey}; +use std::{ + collections::HashMap, + net::UdpSocket, + sync::atomic::{AtomicBool, Ordering}, + sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, + sync::{Arc, Mutex, RwLock}, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, +}; mod broadcast_fake_shreds_run; pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; +pub const NUM_INSERT_THREADS: usize = 2; +pub type RetransmitSlotsSender = CrossbeamSender>>; +pub type RetransmitSlotsReceiver = CrossbeamReceiver>>; + #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { ChannelDisconnected, @@ -43,6 +54,7 @@ impl BroadcastStageType { sock: Vec, cluster_info: Arc>, receiver: Receiver, + retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, shred_version: u16, @@ -53,6 +65,7 @@ impl BroadcastStageType { sock, cluster_info, receiver, + retransmit_slots_receiver, exit_sender, blockstore, StandardBroadcastRun::new(keypair, shred_version), @@ -62,6 +75,7 @@ impl BroadcastStageType { sock, cluster_info, receiver, + retransmit_slots_receiver, exit_sender, blockstore, FailEntryVerificationBroadcastRun::new(keypair, shred_version), @@ -71,6 +85,7 @@ impl BroadcastStageType { sock, cluster_info, receiver, + retransmit_slots_receiver, exit_sender, blockstore, BroadcastFakeShredsRun::new(keypair, 0, shred_version), @@ -79,7 +94,7 @@ impl BroadcastStageType { } } -type TransmitShreds = (Option>>, Arc>); +pub type TransmitShreds = (Option>>, Arc>); trait BroadcastRun { fn run( &mut self, @@ -135,25 +150,27 @@ impl BroadcastStage { loop { let res = broadcast_stage_run.run(blockstore, receiver, socket_sender, blockstore_sender); - let res = Self::handle_error(res); + let res = Self::handle_error(res, "run"); if let Some(res) = res { return res; } } } - fn handle_error(r: Result<()>) -> Option { + fn handle_error(r: Result<()>, name: &str) -> Option { if let Err(e) = r { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError - | Error::RecvError(RecvError) => { + | Error::RecvError(RecvError) + | Error::CrossbeamRecvTimeoutError(CrossbeamRecvTimeoutError::Disconnected) => { return Some(BroadcastStageReturnType::ChannelDisconnected); } - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::RecvTimeoutError(RecvTimeoutError::Timeout) + | Error::CrossbeamRecvTimeoutError(CrossbeamRecvTimeoutError::Timeout) => (), Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? _ => { inc_new_counter_error!("streamer-broadcaster-error", 1, 1); - error!("broadcaster error: {:?}", e); + error!("{} broadcaster error: {:?}", name, e); } } } @@ -180,6 +197,7 @@ impl BroadcastStage { socks: Vec, cluster_info: Arc>, receiver: Receiver, + retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, @@ -189,6 +207,8 @@ impl BroadcastStage { let (socket_sender, socket_receiver) = channel(); let (blockstore_sender, blockstore_receiver) = channel(); let bs_run = broadcast_stage_run.clone(); + + let socket_sender_ = socket_sender.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { @@ -196,7 +216,7 @@ impl BroadcastStage { Self::run( &btree, &receiver, - &socket_sender, + &socket_sender_, &blockstore_sender, bs_run, ) @@ -212,7 +232,7 @@ impl BroadcastStage { .name("solana-broadcaster-transmit".to_string()) .spawn(move || loop { let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock); - let res = Self::handle_error(res); + let res = Self::handle_error(res, "solana-broadcaster-transmit"); if let Some(res) = res { return res; } @@ -229,7 +249,7 @@ impl BroadcastStage { .name("solana-broadcaster-record".to_string()) .spawn(move || loop { let res = bs_record.record(&blockstore_receiver, &btree); - let res = Self::handle_error(res); + let res = Self::handle_error(res, "solana-broadcaster-record"); if let Some(res) = res { return res; } @@ -238,9 +258,68 @@ impl BroadcastStage { thread_hdls.push(t); } + let blockstore = blockstore.clone(); + let retransmit_thread = Builder::new() + .name("solana-broadcaster-retransmit".to_string()) + .spawn(move || loop { + if let Some(res) = Self::handle_error( + Self::check_retransmit_signals( + &blockstore, + &retransmit_slots_receiver, + &socket_sender, + ), + "solana-broadcaster-retransmit-check_retransmit_signals", + ) { + return res; + } + }) + .unwrap(); + + thread_hdls.push(retransmit_thread); Self { thread_hdls } } + fn check_retransmit_signals( + blockstore: &Blockstore, + retransmit_slots_receiver: &RetransmitSlotsReceiver, + socket_sender: &Sender, + ) -> Result<()> { + let timer = Duration::from_millis(100); + + // Check for a retransmit signal + let mut retransmit_slots = retransmit_slots_receiver.recv_timeout(timer)?; + while let Ok(new_retransmit_slots) = retransmit_slots_receiver.try_recv() { + retransmit_slots.extend(new_retransmit_slots); + } + + for (_, bank) in retransmit_slots.iter() { + let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); + let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); + let stakes = stakes.map(Arc::new); + let data_shreds = Arc::new( + blockstore + .get_data_shreds_for_slot(bank.slot(), 0) + .expect("My own shreds must be reconstructable"), + ); + + if !data_shreds.is_empty() { + socket_sender.send((stakes.clone(), data_shreds))?; + } + + let coding_shreds = Arc::new( + blockstore + .get_coding_shreds_for_slot(bank.slot(), 0) + .expect("My own shreds must be reconstructable"), + ); + + if !coding_shreds.is_empty() { + socket_sender.send((stakes.clone(), coding_shreds))?; + } + } + + Ok(()) + } + pub fn join(self) -> thread::Result { for thread_hdl in self.thread_hdls.into_iter() { let _ = thread_hdl.join(); @@ -250,22 +329,139 @@ impl BroadcastStage { } #[cfg(test)] -mod test { +pub mod test { use super::*; - use crate::cluster_info::{ClusterInfo, Node}; - use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_ledger::entry::create_ticks; - use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; + use crate::{ + cluster_info::{ClusterInfo, Node}, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }; + use crossbeam_channel::unbounded; + use solana_ledger::{ + blockstore::{make_slot_entries, Blockstore}, + entry::create_ticks, + get_tmp_ledger_path, + shred::{max_ticks_per_n_shreds, Shredder, RECOMMENDED_FEC_RATE}, + }; use solana_runtime::bank::Bank; - use solana_sdk::hash::Hash; - use solana_sdk::pubkey::Pubkey; - use solana_sdk::signature::{Keypair, Signer}; - use std::path::Path; - use std::sync::atomic::AtomicBool; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; + use solana_sdk::{ + hash::Hash, + pubkey::Pubkey, + signature::{Keypair, Signer}, + }; + use std::{ + path::Path, + sync::atomic::AtomicBool, + sync::mpsc::channel, + sync::{Arc, RwLock}, + thread::sleep, + }; + + pub fn make_transmit_shreds( + slot: Slot, + num: u64, + stakes: Option>>, + ) -> ( + Vec, + Vec, + Vec, + Vec, + ) { + let num_entries = max_ticks_per_n_shreds(num); + let (data_shreds, _) = make_slot_entries(slot, 0, num_entries); + let keypair = Arc::new(Keypair::new()); + let shredder = Shredder::new(slot, 0, RECOMMENDED_FEC_RATE, keypair, 0, 0) + .expect("Expected to create a new shredder"); + + let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..]); + ( + data_shreds.clone(), + coding_shreds.clone(), + data_shreds + .into_iter() + .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .collect(), + coding_shreds + .into_iter() + .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .collect(), + ) + } + + fn check_all_shreds_received( + transmit_receiver: &Receiver, + mut data_index: u64, + mut coding_index: u64, + num_expected_data_shreds: u64, + num_expected_coding_shreds: u64, + ) { + while let Ok(new_retransmit_slots) = transmit_receiver.try_recv() { + if new_retransmit_slots.1[0].is_data() { + for data_shred in new_retransmit_slots.1.iter() { + assert_eq!(data_shred.index() as u64, data_index); + data_index += 1; + } + } else { + assert_eq!(new_retransmit_slots.1[0].index() as u64, coding_index); + for coding_shred in new_retransmit_slots.1.iter() { + assert_eq!(coding_shred.index() as u64, coding_index); + coding_index += 1; + } + } + } + + assert_eq!(num_expected_data_shreds, data_index); + assert_eq!(num_expected_coding_shreds, coding_index); + } + + #[test] + fn test_duplicate_retransmit_signal() { + // Setup + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let (transmit_sender, transmit_receiver) = channel(); + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000); + let bank0 = Arc::new(Bank::new(&genesis_config)); + + // Make some shreds + let updated_slot = 0; + let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = + make_transmit_shreds(updated_slot, 10, None); + let num_data_shreds = all_data_shreds.len(); + let num_coding_shreds = all_coding_shreds.len(); + assert!(num_data_shreds >= 10); + + // Insert all the shreds + blockstore + .insert_shreds(all_data_shreds, None, true) + .unwrap(); + blockstore + .insert_shreds(all_coding_shreds, None, true) + .unwrap(); + + // Insert duplicate retransmit signal, blocks should + // only be retransmitted once + retransmit_slots_sender + .send(vec![(updated_slot, bank0.clone())].into_iter().collect()) + .unwrap(); + retransmit_slots_sender + .send(vec![(updated_slot, bank0.clone())].into_iter().collect()) + .unwrap(); + BroadcastStage::check_retransmit_signals( + &blockstore, + &retransmit_slots_receiver, + &transmit_sender, + ) + .unwrap(); + // Check all the data shreds were received only once + check_all_shreds_received( + &transmit_receiver, + 0, + 0, + num_data_shreds as u64, + num_coding_shreds as u64, + ); + } struct MockBroadcastStage { blockstore: Arc, @@ -277,6 +473,7 @@ mod test { leader_pubkey: &Pubkey, ledger_path: &Path, entry_receiver: Receiver, + retransmit_slots_receiver: RetransmitSlotsReceiver, ) -> MockBroadcastStage { // Make the database ledger let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap()); @@ -304,6 +501,7 @@ mod test { leader_info.sockets.broadcast, cluster_info, entry_receiver, + retransmit_slots_receiver, &exit_sender, &blockstore, StandardBroadcastRun::new(leader_keypair, 0), @@ -326,10 +524,12 @@ mod test { let leader_keypair = Keypair::new(); let (entry_sender, entry_receiver) = channel(); + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let broadcast_service = setup_dummy_broadcast_service( &leader_keypair.pubkey(), &ledger_path, entry_receiver, + retransmit_slots_receiver, ); let start_tick_height; let max_tick_height; @@ -348,6 +548,7 @@ mod test { .expect("Expect successful send to broadcast service"); } } + sleep(Duration::from_millis(2000)); trace!( @@ -364,6 +565,7 @@ mod test { assert_eq!(entries.len(), max_tick_height as usize); drop(entry_sender); + drop(retransmit_slots_sender); broadcast_service .broadcast_service .join() diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 4e00953f81..2f25c09223 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,11 +1,11 @@ use super::broadcast_utils::{self, ReceiveResults}; use super::*; use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; -use solana_ledger::entry::Entry; -use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK}; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Keypair; -use solana_sdk::timing::duration_as_us; +use solana_ledger::{ + entry::Entry, + shred::{Shred, Shredder, RECOMMENDED_FEC_RATE, SHRED_TICK_REFERENCE_MASK}, +}; +use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use std::collections::HashMap; use std::time::Duration; @@ -212,7 +212,8 @@ impl StandardBroadcastRun { blockstore_sender.send(data_shreds.clone())?; let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]); let coding_shreds = Arc::new(coding_shreds); - socket_sender.send((stakes, coding_shreds))?; + socket_sender.send((stakes, coding_shreds.clone()))?; + blockstore_sender.send(coding_shreds)?; self.update_broadcast_stats(BroadcastStats { shredding_elapsed: duration_as_us(&to_shreds_elapsed), receive_elapsed: duration_as_us(&receive_elapsed), @@ -360,7 +361,6 @@ mod test { }; use solana_runtime::bank::Bank; use solana_sdk::{ - clock::Slot, genesis_config::GenesisConfig, signature::{Keypair, Signer}, }; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f31a543e26..c2a85dac49 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1,6 +1,7 @@ //! The `replay_stage` replays transactions broadcast by the leader. use crate::{ + broadcast_stage::RetransmitSlotsSender, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, @@ -47,6 +48,7 @@ use std::{ }; pub const MAX_ENTRY_RECV_PER_ITER: usize = 512; + pub(crate) type ProgressMap = HashMap; #[derive(PartialEq, Debug)] @@ -179,6 +181,7 @@ impl ReplayStage { ledger_signal_receiver: Receiver, poh_recorder: Arc>, _vote_tracker: Arc, + retransmit_slots_sender: RetransmitSlotsSender, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, @@ -208,6 +211,7 @@ impl ReplayStage { let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { + let _retransmit_slots_sender = retransmit_slots_sender; let verify_recyclers = VerifyRecyclers::default(); let _exit = Finalizer::new(exit.clone()); let mut progress = HashMap::new(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 8a0f572d6a..09e7a5915c 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -3,7 +3,7 @@ use crate::{ banking_stage::BankingStage, - broadcast_stage::{BroadcastStage, BroadcastStageType}, + broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver}, cluster_info::ClusterInfo, cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, fetch_stage::FetchStage, @@ -39,6 +39,7 @@ impl Tpu { cluster_info: &Arc>, poh_recorder: &Arc>, entry_receiver: Receiver, + retransmit_slots_receiver: RetransmitSlotsReceiver, transactions_sockets: Vec, tpu_forwards_sockets: Vec, broadcast_sockets: Vec, @@ -92,6 +93,7 @@ impl Tpu { broadcast_sockets, cluster_info.clone(), entry_receiver, + retransmit_slots_receiver, &exit, blockstore, shred_version, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 275a2b32d3..3df71576e1 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -4,6 +4,7 @@ use crate::{ accounts_hash_verifier::AccountsHashVerifier, blockstream_service::BlockstreamService, + broadcast_stage::RetransmitSlotsSender, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, commitment::BlockCommitmentCache, @@ -100,6 +101,7 @@ impl Tvu { rewards_recorder_sender: Option, snapshot_package_sender: Option, vote_tracker: Arc, + retransmit_slots_sender: RetransmitSlotsSender, tvu_config: TvuConfig, ) -> Self { let keypair: Arc = cluster_info @@ -196,6 +198,7 @@ impl Tvu { ledger_signal_receiver, poh_recorder.clone(), vote_tracker, + retransmit_slots_sender, ); let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { @@ -298,6 +301,7 @@ pub mod tests { let storage_keypair = Arc::new(Keypair::new()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); + let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), @@ -327,6 +331,7 @@ pub mod tests { None, None, Arc::new(VoteTracker::new(&bank)), + retransmit_slots_sender, TvuConfig::default(), ); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 374bdd6d1e..300c4aa3a6 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -380,6 +380,7 @@ impl Validator { let vote_tracker = Arc::new({ VoteTracker::new(bank_forks.read().unwrap().root_bank()) }); + let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded(); let tvu = Tvu::new( vote_account, if config.voting_disabled { @@ -430,6 +431,7 @@ impl Validator { rewards_recorder_sender, snapshot_package_sender, vote_tracker.clone(), + retransmit_slots_sender, TvuConfig { max_ledger_slots: config.max_ledger_slots, sigverify_disabled: config.dev_sigverify_disabled, @@ -449,6 +451,7 @@ impl Validator { &cluster_info, &poh_recorder, entry_receiver, + retransmit_slots_receiver, node.sockets.tpu, node.sockets.tpu_forwards, node.sockets.broadcast, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 954bfe28dd..0a57b4657b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -13,7 +13,7 @@ use crate::{ leader_schedule_cache::LeaderScheduleCache, next_slots_iterator::NextSlotsIterator, rooted_slot_iterator::RootedSlotIterator, - shred::{Shred, Shredder}, + shred::{Result as ShredResult, Shred, Shredder}, }; use bincode::deserialize; use log::*; @@ -452,20 +452,24 @@ impl Blockstore { pub fn slot_data_iterator<'a>( &'a self, slot: Slot, + index: u64, ) -> Result)> + 'a> { - let slot_iterator = self - .db - .iter::(IteratorMode::From((slot, 0), IteratorDirection::Forward))?; + let slot_iterator = self.db.iter::(IteratorMode::From( + (slot, index), + IteratorDirection::Forward, + ))?; Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) } pub fn slot_coding_iterator<'a>( &'a self, slot: Slot, + index: u64, ) -> Result)> + 'a> { - let slot_iterator = self - .db - .iter::(IteratorMode::From((slot, 0), IteratorDirection::Forward))?; + let slot_iterator = self.db.iter::(IteratorMode::From( + (slot, index), + IteratorDirection::Forward, + ))?; Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) } @@ -1078,6 +1082,17 @@ impl Blockstore { self.data_shred_cf.get_bytes((slot, index)) } + pub fn get_data_shreds_for_slot( + &self, + slot: Slot, + start_index: u64, + ) -> ShredResult> { + self.slot_data_iterator(slot, start_index) + .expect("blockstore couldn't fetch iterator") + .map(|data| Shred::new_from_serialized_shred(data.1.to_vec())) + .collect() + } + pub fn get_data_shreds( &self, slot: Slot, @@ -1127,6 +1142,17 @@ impl Blockstore { self.code_shred_cf.get_bytes((slot, index)) } + pub fn get_coding_shreds_for_slot( + &self, + slot: Slot, + start_index: u64, + ) -> ShredResult> { + self.slot_coding_iterator(slot, start_index) + .expect("blockstore couldn't fetch iterator") + .map(|code| Shred::new_from_serialized_shred(code.1.to_vec())) + .collect() + } + // Only used by tests #[allow(clippy::too_many_arguments)] pub fn write_entries( @@ -4385,12 +4411,12 @@ pub mod tests { } // Slot doesnt exist, iterator should be empty - let shred_iter = blockstore.slot_data_iterator(5).unwrap(); + let shred_iter = blockstore.slot_data_iterator(5, 0).unwrap(); let result: Vec<_> = shred_iter.collect(); assert_eq!(result, vec![]); // Test that the iterator for slot 8 contains what was inserted earlier - let shred_iter = blockstore.slot_data_iterator(8).unwrap(); + let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap(); let result: Vec = shred_iter .filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok()) .collect(); @@ -5451,7 +5477,7 @@ pub mod tests { let index = blockstore.get_index(slot).unwrap().unwrap(); // Test the set of data shreds in the index and in the data column // family are the same - let data_iter = blockstore.slot_data_iterator(slot).unwrap(); + let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap(); let mut num_data = 0; for ((slot, index), _) in data_iter { num_data += 1; @@ -5464,7 +5490,7 @@ pub mod tests { // Test the set of coding shreds in the index and in the coding column // family are the same - let coding_iter = blockstore.slot_coding_iterator(slot).unwrap(); + let coding_iter = blockstore.slot_coding_iterator(slot, 0).unwrap(); let mut num_coding = 0; for ((slot, index), _) in coding_iter { num_coding += 1;