From 4ffd7693d6db673f294bcffeca76af3e08f5ac91 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 28 Jan 2020 13:45:41 -0800 Subject: [PATCH] Add lock to make sure slot-based locktree calls are safe (#7993) --- core/src/blockstream_service.rs | 44 +++++++++++++++------------- core/src/ledger_cleanup_service.rs | 6 +++- core/src/replay_stage.rs | 3 ++ core/src/rpc.rs | 6 +++- ledger/src/blockstore.rs | 46 ++++++++++++++++++++++++++---- ledger/src/blockstore_db.rs | 1 + 6 files changed, 79 insertions(+), 27 deletions(-) diff --git a/core/src/blockstream_service.rs b/core/src/blockstream_service.rs index ce7a66e9bd..7632cb3784 100644 --- a/core/src/blockstream_service.rs +++ b/core/src/blockstream_service.rs @@ -58,31 +58,35 @@ impl BlockstreamService { let timeout = Duration::new(1, 0); let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?; - let entries = blockstore.get_slot_entries(slot, 0, None).unwrap(); - let blockstore_meta = blockstore.meta(slot).unwrap().unwrap(); - let _parent_slot = if slot == 0 { - None - } else { - Some(blockstore_meta.parent_slot) - }; - let ticks_per_slot = entries.iter().filter(|entry| entry.is_tick()).count() as u64; - let mut tick_height = ticks_per_slot * slot; + // Slot might not exist due to LedgerCleanupService, check first + let blockstore_meta = blockstore.meta(slot).unwrap(); + if let Some(blockstore_meta) = blockstore_meta { + // Return error to main loop. Thread won't exit, will just log the error + let entries = blockstore.get_slot_entries(slot, 0, None)?; + let _parent_slot = if slot == 0 { + None + } else { + Some(blockstore_meta.parent_slot) + }; + let ticks_per_slot = entries.iter().filter(|entry| entry.is_tick()).count() as u64; + let mut tick_height = ticks_per_slot * slot; - for (i, entry) in entries.iter().enumerate() { - if entry.is_tick() { - tick_height += 1; - } - blockstream - .emit_entry_event(slot, tick_height, &slot_leader, &entry) - .unwrap_or_else(|e| { - debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); - }); - if i == entries.len() - 1 { + for (i, entry) in entries.iter().enumerate() { + if entry.is_tick() { + tick_height += 1; + } blockstream - .emit_block_event(slot, tick_height, &slot_leader, entry.hash) + .emit_entry_event(slot, tick_height, &slot_leader, &entry) .unwrap_or_else(|e| { debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); }); + if i == entries.len() - 1 { + blockstream + .emit_block_event(slot, tick_height, &slot_leader, entry.hash) + .unwrap_or_else(|e| { + debug!("Blockstream error: {:?}, {:?}", e, blockstream.output); + }); + } } } Ok(()) diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 11bb8879fc..23b7165dea 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -68,9 +68,13 @@ impl LedgerCleanupService { let disk_utilization_pre = blockstore.storage_size(); let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?; + + // Notify blockstore of impending purge if root > *next_purge_batch { //cleanup - blockstore.purge_slots(0, Some(root - max_ledger_slots)); + let lowest_slot = root - max_ledger_slots; + *blockstore.lowest_cleanup_slot.write().unwrap() = lowest_slot; + blockstore.purge_slots(0, Some(lowest_slot)); *next_purge_batch += DEFAULT_PURGE_BATCH_SIZE; } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c2a64c8683..188fef87fb 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -527,6 +527,9 @@ impl ReplayStage { let tx_count = tx_count_after - tx_count_before; confirm_result.map_err(|err| { + // LedgerCleanupService should not be cleaning up anything + // that comes after the root, so we should not see any + // errors related to the slot being purged let slot = bank.slot(); warn!("Fatal replay error in slot: {}, err: {:?}", slot, err); datapoint_error!( diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 7f788c6f29..d466a22590 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -383,7 +383,11 @@ impl JsonRpcRequestProcessor { let stakes = HashMap::new(); let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes); - Ok(self.blockstore.get_block_time(slot, slot_duration, stakes)) + Ok(self + .blockstore + .get_block_time(slot, slot_duration, stakes) + .ok() + .unwrap_or(None)) } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 2fec64d948..4564569d71 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -90,6 +90,7 @@ pub struct Blockstore { insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, pub completed_slots_senders: Vec>>, + pub lowest_cleanup_slot: Arc>, } pub struct IndexMetaWorkingSetEntry { @@ -207,7 +208,7 @@ impl Blockstore { measure.stop(); info!("{:?} {}", blockstore_path, measure); - Ok(Blockstore { + let blockstore = Blockstore { db, meta_cf, dead_slots_cf, @@ -222,7 +223,9 @@ impl Blockstore { completed_slots_senders: vec![], insert_shreds_lock: Arc::new(Mutex::new(())), last_root, - }) + lowest_cleanup_slot: Arc::new(RwLock::new(0)), + }; + Ok(blockstore) } pub fn open_with_signal( @@ -1059,6 +1062,12 @@ impl Blockstore { to_index: u64, buffer: &mut [u8], ) -> Result<(u64, usize)> { + // lowest_cleanup_slot is the last slot that was not cleaned up by + // LedgerCleanupService + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + if *lowest_cleanup_slot > slot { + return Err(BlockstoreError::SlotCleanedUp); + } let meta_cf = self.db.column::(); let mut buffer_offset = 0; let mut last_index = 0; @@ -1288,14 +1297,26 @@ impl Blockstore { slot: Slot, slot_duration: Duration, stakes: &HashMap, - ) -> Option { + ) -> Result> { + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + // lowest_cleanup_slot is the last slot that was not cleaned up by + // LedgerCleanupService + if *lowest_cleanup_slot > slot { + return Err(BlockstoreError::SlotCleanedUp); + } + let unique_timestamps: HashMap = self .get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE) .into_iter() .flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default()) .collect(); - calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration) + Ok(calculate_stake_weighted_timestamp( + unique_timestamps, + stakes, + slot, + slot_duration, + )) } fn get_timestamp_slots( @@ -1346,6 +1367,12 @@ impl Blockstore { slot: Slot, encoding: Option, ) -> Result { + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + // lowest_cleanup_slot is the last slot that was not cleaned up by + // LedgerCleanupService + if *lowest_cleanup_slot > slot { + return Err(BlockstoreError::SlotCleanedUp); + } let encoding = encoding.unwrap_or(RpcTransactionEncoding::Json); if self.is_root(slot) { let slot_meta_cf = self.db.column::(); @@ -1466,6 +1493,14 @@ impl Blockstore { if self.is_dead(slot) { return Err(BlockstoreError::DeadSlot); } + + // lowest_cleanup_slot is the last slot that was not cleaned up by + // LedgerCleanupService + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + if *lowest_cleanup_slot > slot { + return Err(BlockstoreError::SlotCleanedUp); + } + let slot_meta_cf = self.db.column::(); let slot_meta = slot_meta_cf.get(slot)?; if slot_meta.is_none() { @@ -4886,10 +4921,11 @@ pub mod tests { }) .sum(); expected_time /= total_stake; - assert_eq!(block_time_slot_3.unwrap() as u64, expected_time); + assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time); assert_eq!( blockstore .get_block_time(8, slot_duration.clone(), &stakes) + .unwrap() .unwrap() as u64, expected_time + 2 // At 400ms block duration, 5 slots == 2sec ); diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index acb8fff7b7..06ba489092 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -49,6 +49,7 @@ pub enum BlockstoreError { IO(#[from] std::io::Error), Serialize(#[from] Box), FsExtraError(#[from] fs_extra::error::Error), + SlotCleanedUp, } pub(crate) type Result = std::result::Result;