Fix race in `blocktree.insert_shreds` (#6550)
* Add guard for blocktree insert_shreds * Add test
This commit is contained in:
parent
e174af7838
commit
e1b35f9847
|
@ -30,7 +30,7 @@ use std::fs;
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::rc::Rc;
|
||||
use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
pub const BLOCKTREE_DIRECTORY: &str = "rocksdb";
|
||||
|
||||
|
@ -55,6 +55,7 @@ pub struct Blocktree {
|
|||
data_shred_cf: LedgerColumn<cf::ShredData>,
|
||||
code_shred_cf: LedgerColumn<cf::ShredCode>,
|
||||
last_root: Arc<RwLock<u64>>,
|
||||
insert_shreds_lock: Arc<Mutex<()>>,
|
||||
pub new_shreds_signals: Vec<SyncSender<bool>>,
|
||||
pub completed_slots_senders: Vec<SyncSender<Vec<u64>>>,
|
||||
}
|
||||
|
@ -106,6 +107,7 @@ impl Blocktree {
|
|||
code_shred_cf,
|
||||
new_shreds_signals: vec![],
|
||||
completed_slots_senders: vec![],
|
||||
insert_shreds_lock: Arc::new(Mutex::new(())),
|
||||
last_root,
|
||||
})
|
||||
}
|
||||
|
@ -359,6 +361,7 @@ impl Blocktree {
|
|||
shreds: Vec<Shred>,
|
||||
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
|
||||
) -> Result<()> {
|
||||
let _lock = self.insert_shreds_lock.lock().unwrap();
|
||||
let db = &*self.db;
|
||||
let mut write_batch = db.batch()?;
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
#[macro_use]
|
||||
extern crate solana_ledger;
|
||||
|
||||
use solana_ledger::blocktree::{self, get_tmp_ledger_path, Blocktree};
|
||||
use solana_ledger::entry;
|
||||
use solana_sdk::hash::Hash;
|
||||
use std::sync::Arc;
|
||||
use std::thread::Builder;
|
||||
|
||||
#[test]
|
||||
fn test_multiple_threads_insert_shred() {
|
||||
let blocktree_path = get_tmp_ledger_path!();
|
||||
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
|
||||
|
||||
for _ in 0..100 {
|
||||
let num_threads = 10;
|
||||
|
||||
// Create `num_threads` different ticks in slots 1..num_therads + 1, all
|
||||
// with parent = slot 0
|
||||
let threads: Vec<_> = (0..num_threads)
|
||||
.map(|i| {
|
||||
let entries = entry::create_ticks(1, Hash::default());
|
||||
let shreds = blocktree::entries_to_test_shreds(entries, i + 1, 0, false);
|
||||
let blocktree_ = blocktree.clone();
|
||||
Builder::new()
|
||||
.name("blocktree-writer".to_string())
|
||||
.spawn(move || {
|
||||
blocktree_.insert_shreds(shreds, None).unwrap();
|
||||
})
|
||||
.unwrap()
|
||||
})
|
||||
.collect();
|
||||
|
||||
for t in threads {
|
||||
t.join().unwrap()
|
||||
}
|
||||
|
||||
// Check slot 0 has the correct children
|
||||
let mut meta0 = blocktree.meta(0).unwrap().unwrap();
|
||||
meta0.next_slots.sort();
|
||||
let expected_next_slots: Vec<_> = (1..num_threads + 1).collect();
|
||||
assert_eq!(meta0.next_slots, expected_next_slots);
|
||||
|
||||
// Delete slots for next iteration
|
||||
blocktree.purge_slots(0, None);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
drop(blocktree);
|
||||
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
|
||||
}
|
Loading…
Reference in New Issue