From 575a0e318b7a294e79d6d3428a778308f0cfa2c8 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 9 May 2019 14:10:04 -0700 Subject: [PATCH] Add newly completed slots signal to Blocktree (#4225) * Add channel to blocktree for communicating when slots are completed * Refactor RepairService options into a RepairStrategy --- core/src/blocktree.rs | 193 ++++++++++++++++++++++++++++++++--- core/src/fullnode.rs | 20 +++- core/src/repair_service.rs | 62 +++++++---- core/src/replicator.rs | 5 +- core/src/retransmit_stage.rs | 13 ++- core/src/tvu.rs | 10 +- core/src/window_service.rs | 46 ++++++--- core/tests/tvu.rs | 11 +- 8 files changed, 289 insertions(+), 71 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index dbd6a53f4d..81e04c0d1b 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -28,7 +28,7 @@ use std::cmp; use std::fs; use std::io; use std::rc::Rc; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; use std::sync::{Arc, RwLock}; pub use self::meta::*; @@ -63,6 +63,10 @@ db_imports! {rocks, Rocks, "rocksdb"} #[cfg(feature = "kvstore")] db_imports! {kvs, Kvs, "kvstore"} +pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; + +pub type CompletedSlotsReceiver = Receiver>; + #[derive(Debug)] pub enum BlocktreeError { BlobForIndexExists, @@ -83,6 +87,7 @@ pub struct Blocktree { batch_processor: Arc>, session: Arc, pub new_blobs_signals: Vec>, + pub completed_slots_senders: Vec>>, } // Column family for metadata about a leader slot @@ -141,15 +146,21 @@ impl Blocktree { session, new_blobs_signals: vec![], batch_processor, + completed_slots_senders: vec![], }) } - pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver)> { + pub fn open_with_signal( + ledger_path: &str, + ) -> Result<(Self, Receiver, CompletedSlotsReceiver)> { let mut blocktree = Self::open(ledger_path)?; let (signal_sender, signal_receiver) = sync_channel(1); + let (completed_slots_sender, completed_slots_receiver) = + sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); blocktree.new_blobs_signals = vec![signal_sender]; + blocktree.completed_slots_senders = vec![completed_slots_sender]; - Ok((blocktree, signal_receiver)) + Ok((blocktree, signal_receiver, completed_slots_receiver)) } pub fn destroy(ledger_path: &str) -> Result<()> { @@ -340,11 +351,17 @@ impl Blocktree { // Handle chaining for the working set handle_chaining(&db, &mut write_batch, &slot_meta_working_set)?; let mut should_signal = false; + let mut newly_completed_slots = vec![]; // Check if any metadata was changed, if so, insert the new version of the // metadata into the write batch for (slot, (meta, meta_backup)) in slot_meta_working_set.iter() { let meta: &SlotMeta = &RefCell::borrow(&*meta); + if !self.completed_slots_senders.is_empty() + && is_newly_completed_slot(meta, meta_backup) + { + newly_completed_slots.push(*slot); + } // Check if the working copy of the metadata has changed if Some(meta) != meta_backup.as_ref() { should_signal = should_signal || slot_has_updates(meta, &meta_backup); @@ -356,13 +373,38 @@ impl Blocktree { write_batch.put::((slot, set_index), &erasure_meta)?; } + batch_processor.write(write_batch)?; + if should_signal { - for signal in self.new_blobs_signals.iter() { + for signal in &self.new_blobs_signals { let _ = signal.try_send(true); } } - batch_processor.write(write_batch)?; + if !self.completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() { + let mut slots: Vec<_> = (0..self.completed_slots_senders.len() - 1) + .map(|_| newly_completed_slots.clone()) + .collect(); + + slots.push(newly_completed_slots); + + for (signal, slots) in self.completed_slots_senders.iter().zip(slots.into_iter()) { + let res = signal.try_send(slots); + if let Err(TrySendError::Full(_)) = res { + solana_metrics::submit( + solana_metrics::influxdb::Point::new("blocktree_error") + .add_field( + "error", + solana_metrics::influxdb::Value::String( + "Unable to send newly completed slot because channel is full" + .to_string(), + ), + ) + .to_owned(), + ); + } + } + } Ok(()) } @@ -880,7 +922,7 @@ fn insert_data_blob<'a>( slot_meta.received = cmp::max(blob_index + 1, slot_meta.received); slot_meta.consumed = new_consumed; slot_meta.last_index = { - // If the last slot hasn't been set before, then + // If the last index in the slot hasn't been set before, then // set it to this blob index if slot_meta.last_index == std::u64::MAX { if blob_to_insert.is_last_in_slot() { @@ -1123,9 +1165,8 @@ fn handle_chaining_for_slot( .expect("Slot must exist in the working_set hashmap"); { - let is_orphaned = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); - let mut meta_mut = meta.borrow_mut(); + let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap()); // If: // 1) This is a new slot @@ -1137,27 +1178,32 @@ fn handle_chaining_for_slot( // Check if the slot represented by meta_mut is either a new slot or a orphan. // In both cases we need to run the chaining logic b/c the parent on the slot was // previously unknown. - if meta_backup.is_none() || is_orphaned { + if meta_backup.is_none() || was_orphan_slot { let prev_slot_meta = find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?; - // This is a newly inserted slot so run the chaining logic + // This is a newly inserted slot/orphan so run the chaining logic to link it to a + // newly discovered parent chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut); + // If the parent of `slot` is a newly inserted orphan, insert it into the orphans + // column family if is_orphan(&RefCell::borrow(&*prev_slot_meta)) { write_batch.put::(prev_slot, &true)?; } } } - // At this point this slot has received a parent, so no longer a orphan - if is_orphaned { + // At this point this slot has received a parent, so it's no longer an orphan + if was_orphan_slot { write_batch.delete::(slot)?; } } - // This is a newly inserted slot and slot.is_connected is true, so update all - // child slots so that their `is_connected` = true + // If this is a newly inserted slot, then we know the children of this slot were not previously + // connected to the trunk of the ledger. Thus if slot.is_connected is now true, we need to + // update all child slots with `is_connected` = true because these children are also now newly + // connected to to trunk of the the ledger let should_propagate_is_connected = is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup) && RefCell::borrow(&*meta).is_connected; @@ -1238,7 +1284,6 @@ fn chain_new_slot_to_prev_slot( fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option) -> bool { slot_meta.is_full() && (backup_slot_meta.is_none() - || is_orphan(&backup_slot_meta.as_ref().unwrap()) || slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed) } @@ -2112,7 +2157,7 @@ pub mod tests { pub fn test_new_blobs_signal() { // Initialize ledger let ledger_path = get_tmp_ledger_path("test_new_blobs_signal"); - let (ledger, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); + let (ledger, recvr, _) = Blocktree::open_with_signal(&ledger_path).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; @@ -2188,6 +2233,98 @@ pub mod tests { Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_completed_blobs_signal() { + // Initialize ledger + let ledger_path = get_tmp_ledger_path("test_completed_blobs_signal"); + let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); + let ledger = Arc::new(ledger); + + let entries_per_slot = 10; + + // Create blobs for slot 0 + let (blobs, _) = make_slot_entries(0, 0, entries_per_slot); + + // Insert all but the first blob in the slot, should not be considered complete + ledger + .insert_data_blobs(&blobs[1..entries_per_slot as usize]) + .unwrap(); + assert!(recvr.try_recv().is_err()); + + // Insert first blob, slot should now be considered complete + ledger.insert_data_blobs(once(&blobs[0])).unwrap(); + assert_eq!(recvr.try_recv().unwrap(), vec![0]); + } + + #[test] + pub fn test_completed_blobs_signal_orphans() { + // Initialize ledger + let ledger_path = get_tmp_ledger_path("test_completed_blobs_signal_orphans"); + let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); + let ledger = Arc::new(ledger); + + let entries_per_slot = 10; + let slots = vec![2, 5, 10]; + let all_blobs = make_chaining_slot_entries(&slots[..], entries_per_slot); + + // Get the blobs for slot 5 chaining to slot 2 + let (ref orphan_blobs, _) = all_blobs[1]; + + // Get the blobs for slot 10, chaining to slot 5 + let (ref orphan_child, _) = all_blobs[2]; + + // Insert all but the first blob in the slot, should not be considered complete + ledger + .insert_data_blobs(&orphan_child[1..entries_per_slot as usize]) + .unwrap(); + assert!(recvr.try_recv().is_err()); + + // Insert first blob, slot should now be considered complete + ledger.insert_data_blobs(once(&orphan_child[0])).unwrap(); + assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]); + + // Insert the blobs for the orphan_slot + ledger + .insert_data_blobs(&orphan_blobs[1..entries_per_slot as usize]) + .unwrap(); + assert!(recvr.try_recv().is_err()); + + // Insert first blob, slot should now be considered complete + ledger.insert_data_blobs(once(&orphan_blobs[0])).unwrap(); + assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]); + } + + #[test] + pub fn test_completed_blobs_signal_many() { + // Initialize ledger + let ledger_path = get_tmp_ledger_path("test_completed_blobs_signal_many"); + let (ledger, _, recvr) = Blocktree::open_with_signal(&ledger_path).unwrap(); + let ledger = Arc::new(ledger); + + let entries_per_slot = 10; + let mut slots = vec![2, 5, 10]; + let all_blobs = make_chaining_slot_entries(&slots[..], entries_per_slot); + let disconnected_slot = 4; + + let (ref blobs0, _) = all_blobs[0]; + let (ref blobs1, _) = all_blobs[1]; + let (ref blobs2, _) = all_blobs[2]; + let (ref blobs3, _) = make_slot_entries(disconnected_slot, 1, entries_per_slot); + + let mut all_blobs: Vec<_> = vec![blobs0, blobs1, blobs2, blobs3] + .into_iter() + .flatten() + .collect(); + + all_blobs.shuffle(&mut thread_rng()); + ledger.insert_data_blobs(all_blobs).unwrap(); + let mut result = recvr.try_recv().unwrap(); + result.sort(); + slots.push(disconnected_slot); + slots.sort(); + assert_eq!(result, slots); + } + #[test] pub fn test_handle_chaining_basic() { let blocktree_path = get_tmp_ledger_path("test_handle_chaining_basic"); @@ -3375,4 +3512,28 @@ pub mod tests { (blobs, entries) } + + // Create blobs for slots that have a parent-child relationship defined by the input `chain` + pub fn make_chaining_slot_entries( + chain: &[u64], + entries_per_slot: u64, + ) -> Vec<(Vec, Vec)> { + let mut slots_blobs_and_entries = vec![]; + for (i, slot) in chain.iter().enumerate() { + let parent_slot = { + if *slot == 0 { + 0 + } else if i == 0 { + std::u64::MAX + } else { + chain[i - 1] + } + }; + + let result = make_slot_entries(*slot, parent_slot, entries_per_slot); + slots_blobs_and_entries.push(result); + } + + slots_blobs_and_entries + } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 687090058b..b291f33d59 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -1,7 +1,7 @@ //! The `fullnode` module hosts all the fullnode microservices. use crate::bank_forks::BankForks; -use crate::blocktree::Blocktree; +use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::blocktree_processor::{self, BankForksInfo}; use crate::cluster_info::{ClusterInfo, Node}; use crate::contact_info::ContactInfo; @@ -95,8 +95,14 @@ impl Fullnode { let id = keypair.pubkey(); assert_eq!(id, node.info.id); - let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = - new_banks_from_blocktree(ledger_path, config.account_paths.clone()); + let ( + bank_forks, + bank_forks_info, + blocktree, + ledger_signal_receiver, + completed_slots_receiver, + leader_schedule_cache, + ) = new_banks_from_blocktree(ledger_path, config.account_paths.clone()); let leader_schedule_cache = Arc::new(leader_schedule_cache); let exit = Arc::new(AtomicBool::new(false)); @@ -236,6 +242,7 @@ impl Fullnode { &leader_schedule_cache, &exit, &genesis_blockhash, + completed_slots_receiver, ); if config.sigverify_disabled { @@ -290,13 +297,15 @@ pub fn new_banks_from_blocktree( Vec, Blocktree, Receiver, + CompletedSlotsReceiver, LeaderScheduleCache, ) { let genesis_block = GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block"); - let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path) - .expect("Expected to successfully open database ledger"); + let (blocktree, ledger_signal_receiver, completed_slots_receiver) = + Blocktree::open_with_signal(blocktree_path) + .expect("Expected to successfully open database ledger"); let (bank_forks, bank_forks_info, leader_schedule_cache) = blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) @@ -307,6 +316,7 @@ pub fn new_banks_from_blocktree( bank_forks_info, blocktree, ledger_signal_receiver, + completed_slots_receiver, leader_schedule_cache, ) } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 2228e0b612..891612e3cd 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -2,7 +2,7 @@ //! regularly finds missing blobs in the ledger and sends repair requests for those blobs use crate::bank_forks::BankForks; -use crate::blocktree::{Blocktree, SlotMeta}; +use crate::blocktree::{Blocktree, CompletedSlotsReceiver, SlotMeta}; use crate::cluster_info::ClusterInfo; use crate::result::Result; use crate::service::Service; @@ -22,6 +22,14 @@ pub const MAX_REPAIR_TRIES: u64 = 128; pub const NUM_FORKS_TO_REPAIR: usize = 5; pub const MAX_ORPHANS: usize = 5; +pub enum RepairStrategy { + RepairRange(RepairSlotRange), + RepairAll { + bank_forks: Arc>, + completed_slots_receiver: CompletedSlotsReceiver, + }, +} + #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum RepairType { Orphan(u64), @@ -68,8 +76,7 @@ impl RepairService { exit: &Arc, repair_socket: Arc, cluster_info: Arc>, - bank_forks: Option>>, - repair_slot_range: Option, + repair_strategy: RepairStrategy, ) -> Self { let exit = exit.clone(); let t_repair = Builder::new() @@ -80,8 +87,7 @@ impl RepairService { exit, &repair_socket, &cluster_info, - &bank_forks, - repair_slot_range, + repair_strategy, ) }) .unwrap(); @@ -94,8 +100,7 @@ impl RepairService { exit: Arc, repair_socket: &Arc, cluster_info: &Arc>, - bank_forks: &Option>>, - repair_slot_range: Option, + repair_strategy: RepairStrategy, ) { let mut repair_info = RepairInfo::new(); let epoch_slots: HashSet = HashSet::new(); @@ -106,20 +111,30 @@ impl RepairService { } let repairs = { - if let Some(ref repair_slot_range) = repair_slot_range { - // Strategy used by replicators - Self::generate_repairs_in_range( - blocktree, - MAX_REPAIR_LENGTH, - &mut repair_info, - repair_slot_range, - ) - } else { - let bank_forks = bank_forks - .as_ref() - .expect("Non-replicator repair strategy missing BankForks"); - Self::update_fast_repair(id, &epoch_slots, &cluster_info, bank_forks); - Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH) + match repair_strategy { + RepairStrategy::RepairRange(ref repair_slot_range) => { + // Strategy used by replicators + Self::generate_repairs_in_range( + blocktree, + MAX_REPAIR_LENGTH, + &mut repair_info, + repair_slot_range, + ) + } + + RepairStrategy::RepairAll { + ref bank_forks, + ref completed_slots_receiver, + } => { + Self::update_epoch_slots( + id, + &epoch_slots, + &cluster_info, + bank_forks, + completed_slots_receiver, + ); + Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH) + } } }; @@ -278,11 +293,14 @@ impl RepairService { } } - fn update_fast_repair( + // Update the gossiped structure used for the "Repairmen" repair protocol. See book + // for details. + fn update_epoch_slots( id: Pubkey, slots: &HashSet, cluster_info: &RwLock, bank_forks: &Arc>, + _completed_slots_receiver: &CompletedSlotsReceiver, ) { let root = bank_forks.read().unwrap().root(); cluster_info diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 8e33db522d..2e27ee1c50 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -6,7 +6,7 @@ use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::gossip_service::GossipService; use crate::packet::to_shared_blob; -use crate::repair_service::RepairSlotRange; +use crate::repair_service::{RepairSlotRange, RepairStrategy}; use crate::result::Result; use crate::service::Service; use crate::storage_stage::SLOTS_PER_SEGMENT; @@ -232,14 +232,13 @@ impl Replicator { let window_service = WindowService::new( None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924 - None, //TODO: see above ^ blocktree.clone(), cluster_info.clone(), blob_fetch_receiver, retransmit_sender, repair_socket, &exit, - Some(repair_slot_range), + RepairStrategy::RepairRange(repair_slot_range), &Hash::default(), ); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index e73a02e87b..769c8376a7 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -1,9 +1,10 @@ //! The `retransmit_stage` retransmits blobs between validators use crate::bank_forks::BankForks; -use crate::blocktree::Blocktree; +use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}; use crate::leader_schedule_cache::LeaderScheduleCache; +use crate::repair_service::RepairStrategy; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; @@ -108,6 +109,7 @@ pub struct RetransmitStage { impl RetransmitStage { #[allow(clippy::new_ret_no_self)] + #[allow(clippy::too_many_arguments)] pub fn new( bank_forks: Arc>, leader_schedule_cache: &Arc, @@ -118,6 +120,7 @@ impl RetransmitStage { fetch_stage_receiver: BlobReceiver, exit: &Arc, genesis_blockhash: &Hash, + completed_slots_receiver: CompletedSlotsReceiver, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -128,8 +131,12 @@ impl RetransmitStage { cluster_info.clone(), retransmit_receiver, ); + + let repair_strategy = RepairStrategy::RepairAll { + bank_forks, + completed_slots_receiver, + }; let window_service = WindowService::new( - Some(bank_forks), Some(leader_schedule_cache.clone()), blocktree, cluster_info.clone(), @@ -137,7 +144,7 @@ impl RetransmitStage { retransmit_sender, repair_socket, exit, - None, + repair_strategy, genesis_blockhash, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9f4736404f..dac132691d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -15,7 +15,7 @@ use crate::bank_forks::BankForks; use crate::blob_fetch_stage::BlobFetchStage; use crate::blockstream_service::BlockstreamService; -use crate::blocktree::Blocktree; +use crate::blocktree::{Blocktree, CompletedSlotsReceiver}; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::PohRecorder; @@ -71,6 +71,7 @@ impl Tvu { leader_schedule_cache: &Arc, exit: &Arc, genesis_blockhash: &Hash, + completed_slots_receiver: CompletedSlotsReceiver, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -108,6 +109,7 @@ impl Tvu { blob_fetch_receiver, &exit, genesis_blockhash, + completed_slots_receiver, ); let (replay_stage, slot_full_receiver, root_slot_receiver) = ReplayStage::new( @@ -203,8 +205,9 @@ pub mod tests { let cref1 = Arc::new(RwLock::new(cluster_info1)); let blocktree_path = get_tmp_ledger_path!(); - let (blocktree, l_receiver) = Blocktree::open_with_signal(&blocktree_path) - .expect("Expected to successfully open ledger"); + let (blocktree, l_receiver, completed_slots_receiver) = + Blocktree::open_with_signal(&blocktree_path) + .expect("Expected to successfully open ledger"); let blocktree = Arc::new(blocktree); let bank = bank_forks.working_bank(); let (exit, poh_recorder, poh_service, _entry_receiver) = @@ -233,6 +236,7 @@ pub mod tests { &leader_schedule_cache, &exit, &Hash::default(), + completed_slots_receiver, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 2601cbef2b..7745cf807d 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -7,7 +7,7 @@ use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_utils::slot_leader_at; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; -use crate::repair_service::{RepairService, RepairSlotRange}; +use crate::repair_service::{RepairService, RepairStrategy}; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{BlobReceiver, BlobSender}; @@ -175,7 +175,6 @@ pub struct WindowService { impl WindowService { #[allow(clippy::too_many_arguments)] pub fn new( - bank_forks: Option>>, leader_schedule_cache: Option>, blocktree: Arc, cluster_info: Arc>, @@ -183,16 +182,21 @@ impl WindowService { retransmit: BlobSender, repair_socket: Arc, exit: &Arc, - repair_slot_range: Option, + repair_strategy: RepairStrategy, genesis_blockhash: &Hash, ) -> WindowService { + let bank_forks = match repair_strategy { + RepairStrategy::RepairRange(_) => None, + + RepairStrategy::RepairAll { ref bank_forks, .. } => Some(bank_forks.clone()), + }; + let repair_service = RepairService::new( blocktree.clone(), exit, repair_socket, cluster_info.clone(), - bank_forks.clone(), - repair_slot_range, + repair_strategy, ); let exit = exit.clone(); let leader_schedule_cache = leader_schedule_cache.clone(); @@ -207,6 +211,7 @@ impl WindowService { if exit.load(Ordering::Relaxed) { break; } + if let Err(e) = recv_window( bank_forks.as_ref(), leader_schedule_cache.as_ref(), @@ -351,15 +356,18 @@ mod test { let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), &exit, s_reader); let (s_retransmit, r_retransmit) = channel(); let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Arc::new( - Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), - ); + let (blocktree, _, completed_slots_receiver) = Blocktree::open_with_signal(&blocktree_path) + .expect("Expected to be able to open database ledger"); + let blocktree = Arc::new(blocktree); let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let bank_forks = Some(Arc::new(RwLock::new(BankForks::new(0, bank)))); - let t_window = WindowService::new( + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let repair_strategy = RepairStrategy::RepairAll { bank_forks, + completed_slots_receiver, + }; + let t_window = WindowService::new( Some(leader_schedule_cache), blocktree, subs, @@ -367,7 +375,7 @@ mod test { s_retransmit, Arc::new(leader_node.sockets.repair), &exit, - None, + repair_strategy, &Hash::default(), ); let t_responder = { @@ -430,14 +438,18 @@ mod test { let t_receiver = blob_receiver(Arc::new(leader_node.sockets.gossip), &exit, s_reader); let (s_retransmit, r_retransmit) = channel(); let blocktree_path = get_tmp_ledger_path!(); - let blocktree = Arc::new( - Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), - ); + let (blocktree, _, completed_slots_receiver) = Blocktree::open_with_signal(&blocktree_path) + .expect("Expected to be able to open database ledger"); + + let blocktree = Arc::new(blocktree); let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let bank_forks = Some(Arc::new(RwLock::new(BankForks::new(0, bank)))); - let t_window = WindowService::new( + let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let repair_strategy = RepairStrategy::RepairAll { bank_forks, + completed_slots_receiver, + }; + let t_window = WindowService::new( Some(leader_schedule_cache), blocktree, subs.clone(), @@ -445,7 +457,7 @@ mod test { s_retransmit, Arc::new(leader_node.sockets.repair), &exit, - None, + repair_strategy, &Hash::default(), ); let t_responder = { diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index bf1d38f973..9f220ab681 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -83,8 +83,14 @@ fn test_replay() { let tvu_addr = target1.info.tvu; - let (bank_forks, _bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = - fullnode::new_banks_from_blocktree(&blocktree_path, None); + let ( + bank_forks, + _bank_forks_info, + blocktree, + ledger_signal_receiver, + completed_slots_receiver, + leader_schedule_cache, + ) = fullnode::new_banks_from_blocktree(&blocktree_path, None); let working_bank = bank_forks.working_bank(); assert_eq!( working_bank.get_balance(&mint_keypair.pubkey()), @@ -126,6 +132,7 @@ fn test_replay() { &leader_schedule_cache, &exit, &solana_sdk::hash::Hash::default(), + completed_slots_receiver, ); let mut mint_ref_balance = mint_balance;