From e1b35f9847ec96140a98a3105bc4aeaf95a8be18 Mon Sep 17 00:00:00 2001 From: carllin Date: Sat, 26 Oct 2019 04:09:58 -0700 Subject: [PATCH] Fix race in `blocktree.insert_shreds` (#6550) * Add guard for blocktree insert_shreds * Add test --- ledger/src/blocktree.rs | 5 +++- ledger/tests/blocktree.rs | 51 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 ledger/tests/blocktree.rs diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 77118cfcd..a10419ed3 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -30,7 +30,7 @@ use std::fs; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TrySendError}; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; pub const BLOCKTREE_DIRECTORY: &str = "rocksdb"; @@ -55,6 +55,7 @@ pub struct Blocktree { data_shred_cf: LedgerColumn, code_shred_cf: LedgerColumn, last_root: Arc>, + insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, pub completed_slots_senders: Vec>>, } @@ -106,6 +107,7 @@ impl Blocktree { code_shred_cf, new_shreds_signals: vec![], completed_slots_senders: vec![], + insert_shreds_lock: Arc::new(Mutex::new(())), last_root, }) } @@ -359,6 +361,7 @@ impl Blocktree { shreds: Vec, leader_schedule: Option<&Arc>, ) -> Result<()> { + let _lock = self.insert_shreds_lock.lock().unwrap(); let db = &*self.db; let mut write_batch = db.batch()?; diff --git a/ledger/tests/blocktree.rs b/ledger/tests/blocktree.rs new file mode 100644 index 000000000..87dfa3e05 --- /dev/null +++ b/ledger/tests/blocktree.rs @@ -0,0 +1,51 @@ +#[macro_use] +extern crate solana_ledger; + +use solana_ledger::blocktree::{self, get_tmp_ledger_path, Blocktree}; +use solana_ledger::entry; +use solana_sdk::hash::Hash; +use std::sync::Arc; +use std::thread::Builder; + +#[test] +fn test_multiple_threads_insert_shred() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap()); + + for _ in 0..100 { + let num_threads = 10; + + // Create `num_threads` different ticks in slots 1..num_therads + 1, all + // with parent = slot 0 + let threads: Vec<_> = (0..num_threads) + .map(|i| { + let entries = entry::create_ticks(1, Hash::default()); + let shreds = blocktree::entries_to_test_shreds(entries, i + 1, 0, false); + let blocktree_ = blocktree.clone(); + Builder::new() + .name("blocktree-writer".to_string()) + .spawn(move || { + blocktree_.insert_shreds(shreds, None).unwrap(); + }) + .unwrap() + }) + .collect(); + + for t in threads { + t.join().unwrap() + } + + // Check slot 0 has the correct children + let mut meta0 = blocktree.meta(0).unwrap().unwrap(); + meta0.next_slots.sort(); + let expected_next_slots: Vec<_> = (1..num_threads + 1).collect(); + assert_eq!(meta0.next_slots, expected_next_slots); + + // Delete slots for next iteration + blocktree.purge_slots(0, None); + } + + // Cleanup + drop(blocktree); + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); +}