Add lock to make sure slot-based locktree calls are safe (#7993)

This commit is contained in:
carllin 2020-01-28 13:45:41 -08:00 committed by GitHub
parent 1596c961d9
commit 4ffd7693d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 79 additions and 27 deletions

View File

@ -58,31 +58,35 @@ impl BlockstreamService {
let timeout = Duration::new(1, 0);
let (slot, slot_leader) = slot_full_receiver.recv_timeout(timeout)?;
let entries = blockstore.get_slot_entries(slot, 0, None).unwrap();
let blockstore_meta = blockstore.meta(slot).unwrap().unwrap();
let _parent_slot = if slot == 0 {
None
} else {
Some(blockstore_meta.parent_slot)
};
let ticks_per_slot = entries.iter().filter(|entry| entry.is_tick()).count() as u64;
let mut tick_height = ticks_per_slot * slot;
// Slot might not exist due to LedgerCleanupService, check first
let blockstore_meta = blockstore.meta(slot).unwrap();
if let Some(blockstore_meta) = blockstore_meta {
// Return error to main loop. Thread won't exit, will just log the error
let entries = blockstore.get_slot_entries(slot, 0, None)?;
let _parent_slot = if slot == 0 {
None
} else {
Some(blockstore_meta.parent_slot)
};
let ticks_per_slot = entries.iter().filter(|entry| entry.is_tick()).count() as u64;
let mut tick_height = ticks_per_slot * slot;
for (i, entry) in entries.iter().enumerate() {
if entry.is_tick() {
tick_height += 1;
}
blockstream
.emit_entry_event(slot, tick_height, &slot_leader, &entry)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
if i == entries.len() - 1 {
for (i, entry) in entries.iter().enumerate() {
if entry.is_tick() {
tick_height += 1;
}
blockstream
.emit_block_event(slot, tick_height, &slot_leader, entry.hash)
.emit_entry_event(slot, tick_height, &slot_leader, &entry)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
if i == entries.len() - 1 {
blockstream
.emit_block_event(slot, tick_height, &slot_leader, entry.hash)
.unwrap_or_else(|e| {
debug!("Blockstream error: {:?}, {:?}", e, blockstream.output);
});
}
}
}
Ok(())

View File

@ -68,9 +68,13 @@ impl LedgerCleanupService {
let disk_utilization_pre = blockstore.storage_size();
let root = new_root_receiver.recv_timeout(Duration::from_secs(1))?;
// Notify blockstore of impending purge
if root > *next_purge_batch {
//cleanup
blockstore.purge_slots(0, Some(root - max_ledger_slots));
let lowest_slot = root - max_ledger_slots;
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_slot;
blockstore.purge_slots(0, Some(lowest_slot));
*next_purge_batch += DEFAULT_PURGE_BATCH_SIZE;
}

View File

@ -527,6 +527,9 @@ impl ReplayStage {
let tx_count = tx_count_after - tx_count_before;
confirm_result.map_err(|err| {
// LedgerCleanupService should not be cleaning up anything
// that comes after the root, so we should not see any
// errors related to the slot being purged
let slot = bank.slot();
warn!("Fatal replay error in slot: {}, err: {:?}", slot, err);
datapoint_error!(

View File

@ -383,7 +383,11 @@ impl JsonRpcRequestProcessor {
let stakes = HashMap::new();
let stakes = bank.epoch_vote_accounts(epoch).unwrap_or(&stakes);
Ok(self.blockstore.get_block_time(slot, slot_duration, stakes))
Ok(self
.blockstore
.get_block_time(slot, slot_duration, stakes)
.ok()
.unwrap_or(None))
}
}

View File

@ -90,6 +90,7 @@ pub struct Blockstore {
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
pub lowest_cleanup_slot: Arc<RwLock<u64>>,
}
pub struct IndexMetaWorkingSetEntry {
@ -207,7 +208,7 @@ impl Blockstore {
measure.stop();
info!("{:?} {}", blockstore_path, measure);
Ok(Blockstore {
let blockstore = Blockstore {
db,
meta_cf,
dead_slots_cf,
@ -222,7 +223,9 @@ impl Blockstore {
completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())),
last_root,
})
lowest_cleanup_slot: Arc::new(RwLock::new(0)),
};
Ok(blockstore)
}
pub fn open_with_signal(
@ -1059,6 +1062,12 @@ impl Blockstore {
to_index: u64,
buffer: &mut [u8],
) -> Result<(u64, usize)> {
// lowest_cleanup_slot is the last slot that was not cleaned up by
// LedgerCleanupService
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let meta_cf = self.db.column::<cf::SlotMeta>();
let mut buffer_offset = 0;
let mut last_index = 0;
@ -1288,14 +1297,26 @@ impl Blockstore {
slot: Slot,
slot_duration: Duration,
stakes: &HashMap<Pubkey, (u64, Account)>,
) -> Option<UnixTimestamp> {
) -> Result<Option<UnixTimestamp>> {
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
// lowest_cleanup_slot is the last slot that was not cleaned up by
// LedgerCleanupService
if *lowest_cleanup_slot > slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let unique_timestamps: HashMap<Pubkey, (Slot, UnixTimestamp)> = self
.get_timestamp_slots(slot, TIMESTAMP_SLOT_INTERVAL, TIMESTAMP_SLOT_RANGE)
.into_iter()
.flat_map(|query_slot| self.get_block_timestamps(query_slot).unwrap_or_default())
.collect();
calculate_stake_weighted_timestamp(unique_timestamps, stakes, slot, slot_duration)
Ok(calculate_stake_weighted_timestamp(
unique_timestamps,
stakes,
slot,
slot_duration,
))
}
fn get_timestamp_slots(
@ -1346,6 +1367,12 @@ impl Blockstore {
slot: Slot,
encoding: Option<RpcTransactionEncoding>,
) -> Result<RpcConfirmedBlock> {
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
// lowest_cleanup_slot is the last slot that was not cleaned up by
// LedgerCleanupService
if *lowest_cleanup_slot > slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let encoding = encoding.unwrap_or(RpcTransactionEncoding::Json);
if self.is_root(slot) {
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
@ -1466,6 +1493,14 @@ impl Blockstore {
if self.is_dead(slot) {
return Err(BlockstoreError::DeadSlot);
}
// lowest_cleanup_slot is the last slot that was not cleaned up by
// LedgerCleanupService
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() {
@ -4886,10 +4921,11 @@ pub mod tests {
})
.sum();
expected_time /= total_stake;
assert_eq!(block_time_slot_3.unwrap() as u64, expected_time);
assert_eq!(block_time_slot_3.unwrap().unwrap() as u64, expected_time);
assert_eq!(
blockstore
.get_block_time(8, slot_duration.clone(), &stakes)
.unwrap()
.unwrap() as u64,
expected_time + 2 // At 400ms block duration, 5 slots == 2sec
);

View File

@ -49,6 +49,7 @@ pub enum BlockstoreError {
IO(#[from] std::io::Error),
Serialize(#[from] Box<bincode::ErrorKind>),
FsExtraError(#[from] fs_extra::error::Error),
SlotCleanedUp,
}
pub(crate) type Result<T> = std::result::Result<T, BlockstoreError>;