From f91ffbbfdf30de3b284b6c9c6c65c723501a812e Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 13 Jan 2020 17:21:39 -0800 Subject: [PATCH] Add support in BlockStore for tracking duplicate slots (#7761) * Add test * Add new column family to track duplicate slots * Fix clippy errors * Introduce new SlotColumn for common implementation of Column trait --- Cargo.lock | 1 + ledger-tool/src/main.rs | 4 +- ledger/Cargo.toml | 1 + ledger/src/blockstore.rs | 113 +++++++++++++++-- ledger/src/blockstore_db.rs | 228 +++++++++++++++------------------- ledger/src/blockstore_meta.rs | 14 +++ 6 files changed, 219 insertions(+), 142 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12b8db4be..bf23b5dd8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3907,6 +3907,7 @@ dependencies = [ "rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "solana-budget-program 0.23.0", diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index d4beb95bc..4259c4ee4 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -394,7 +394,9 @@ fn graph_forks( dot.join("\n") } -fn analyze_column( +fn analyze_column< + T: solana_ledger::blockstore_db::Column + solana_ledger::blockstore_db::ColumnName, +>( db: &Database, name: &str, key_size: usize, diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 89edd2a50..6edabdd2b 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -28,6 +28,7 @@ rand_chacha = "0.1.1" rayon = "1.2.0" reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] } serde = "1.0.104" +serde_bytes = "0.11.3" serde_derive = "1.0.103" solana-client = { path = "../client", version = "0.23.0" } solana-genesis-programs = { path = "../genesis-programs", version = "0.23.0" } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 53a796310..00bc0d6e5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -77,6 +77,7 @@ pub struct Blockstore { db: Arc, meta_cf: LedgerColumn, dead_slots_cf: LedgerColumn, + duplicate_slots_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, index_cf: LedgerColumn, @@ -179,7 +180,7 @@ impl Blockstore { // Create the dead slots column family let dead_slots_cf = db.column(); - + let duplicate_slots_cf = db.column(); let erasure_meta_cf = db.column(); // Create the orphans column family. An "orphan" is defined as @@ -208,6 +209,7 @@ impl Blockstore { db, meta_cf, dead_slots_cf, + duplicate_slots_cf, erasure_meta_cf, orphans_cf, index_cf, @@ -303,39 +305,43 @@ impl Blockstore { let columns_empty = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false) + .unwrap_or(false) & self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or_else(|_| false); + .unwrap_or(false); if let Err(e) = self.db.write(write_batch) { error!( "Error: {:?} while submitting write batch for slot {:?} retrying...", @@ -368,6 +374,10 @@ impl Blockstore { .dead_slots_cf .compact_range(from_slot, to_slot) .unwrap_or(false) + && self + .duplicate_slots_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) && self .erasure_meta_cf .compact_range(from_slot, to_slot) @@ -1626,6 +1636,37 @@ impl Blockstore { self.dead_slots_cf.put(slot, &true) } + pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec, shred2: Vec) -> Result<()> { + let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2); + self.duplicate_slots_cf.put(slot, &duplicate_slot_proof) + } + + pub fn get_duplicate_slot(&self, slot: u64) -> Option { + self.duplicate_slots_cf + .get(slot) + .expect("fetch from DuplicateSlots column family failed") + } + + // `new_shred` is asssumed to have slot and index equal to the given slot and index. + // Returns true if `new_shred` is not equal to the existing shred at the given + // slot and index as this implies the leader generated two different shreds with + // the same slot and index + pub fn is_shred_duplicate(&self, slot: u64, index: u64, new_shred: &[u8]) -> bool { + let res = self + .get_data_shred(slot, index) + .expect("fetch from DuplicateSlots column family failed"); + + res.map(|existing_shred| existing_shred != new_shred) + .unwrap_or(false) + } + + pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool { + self.duplicate_slots_cf + .get(slot) + .expect("fetch from DuplicateSlots column family failed") + .is_some() + } + pub fn get_orphans(&self, max: Option) -> Vec { let mut results = vec![]; @@ -2411,6 +2452,13 @@ pub mod tests { .next() .map(|(slot, _)| slot >= min_slot) .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) & blockstore .db .iter::(IteratorMode::Start) @@ -5193,4 +5241,49 @@ pub mod tests { let num_coding_in_index = index.coding().num_shreds(); assert_eq!(num_coding_in_index, num_coding); } + + #[test] + fn test_duplicate_slot() { + let slot = 0; + let entries1 = make_slot_entries_with_transactions(1); + let entries2 = make_slot_entries_with_transactions(1); + let leader_keypair = Arc::new(Keypair::new()); + let shredder = Shredder::new(slot, 0, 1.0, leader_keypair.clone(), 0, 0) + .expect("Failed in creating shredder"); + let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0); + let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0); + let shred = shreds[0].clone(); + let duplicate_shred = duplicate_shreds[0].clone(); + let non_duplicate_shred = shred.clone(); + + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + blockstore + .insert_shreds(vec![shred.clone()], None, false) + .unwrap(); + + // No duplicate shreds exist yet + assert!(!blockstore.has_duplicate_shreds_in_slot(slot)); + + // Check if shreds are duplicated + assert!(blockstore.is_shred_duplicate(slot, 0, &duplicate_shred.payload)); + assert!(!blockstore.is_shred_duplicate(slot, 0, &non_duplicate_shred.payload)); + + // Store a duplicate shred + blockstore + .store_duplicate_slot(slot, shred.payload.clone(), duplicate_shred.payload.clone()) + .unwrap(); + + // Slot is now marked as duplicate + assert!(blockstore.has_duplicate_shreds_in_slot(slot)); + + // Check ability to fetch the duplicates + let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap(); + assert_eq!(duplicate_proof.shred1, shred.payload); + assert_eq!(duplicate_proof.shred2, duplicate_shred.payload); + } + + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index a801cb408..e61b5fffe 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -21,6 +21,10 @@ const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB const META_CF: &str = "meta"; // Column family for slots that have been marked as dead const DEAD_SLOTS_CF: &str = "dead_slots"; +// Column family for storing proof that there were multiple +// versions of a slot +const DUPLICATE_SLOTS_CF: &str = "duplicate_slots"; +// Column family storing erasure metadata for a slot const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data const ORPHANS_CF: &str = "orphans"; @@ -62,17 +66,21 @@ pub enum IteratorMode { pub mod columns { #[derive(Debug)] - /// SlotMeta Column + /// The slot metadata column pub struct SlotMeta; #[derive(Debug)] - /// Orphans Column + /// The orphans column pub struct Orphans; #[derive(Debug)] - /// Data Column + /// The dead slots column pub struct DeadSlots; + #[derive(Debug)] + /// The duplicate slots column + pub struct DuplicateSlots; + #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; @@ -104,8 +112,8 @@ struct Rocks(rocksdb::DB); impl Rocks { fn open(path: &Path) -> Result { use columns::{ - DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta, - TransactionStatus, + DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, + SlotMeta, TransactionStatus, }; fs::create_dir_all(&path)?; @@ -117,6 +125,8 @@ impl Rocks { let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); + let duplicate_slots_cf_descriptor = + ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options()); let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); @@ -132,6 +142,7 @@ impl Rocks { let cfs = vec![ meta_cf_descriptor, dead_slots_cf_descriptor, + duplicate_slots_cf_descriptor, erasure_meta_cf_descriptor, orphans_cf_descriptor, root_cf_descriptor, @@ -149,13 +160,14 @@ impl Rocks { fn columns(&self) -> Vec<&'static str> { use columns::{ - DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta, - TransactionStatus, + DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, + SlotMeta, TransactionStatus, }; vec![ ErasureMeta::NAME, DeadSlots::NAME, + DuplicateSlots::NAME, Index::NAME, Orphans::NAME, Root::NAME, @@ -226,7 +238,6 @@ impl Rocks { } pub trait Column { - const NAME: &'static str; type Index; fn key_size() -> usize { @@ -239,6 +250,10 @@ pub trait Column { fn as_index(slot: Slot) -> Self::Index; } +pub trait ColumnName { + const NAME: &'static str; +} + pub trait TypedColumn: Column { type Type: Serialize + DeserializeOwned; } @@ -247,8 +262,31 @@ impl TypedColumn for columns::TransactionStatus { type Type = RpcTransactionStatus; } +pub trait SlotColumn {} + +impl Column for T { + type Index = u64; + + fn key(slot: u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], slot); + key + } + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } + + fn slot(index: u64) -> Slot { + index + } + + fn as_index(slot: Slot) -> u64 { + slot + } +} + impl Column for columns::TransactionStatus { - const NAME: &'static str = TRANSACTION_STATUS_CF; type Index = (Slot, Signature); fn key((slot, index): (Slot, Signature)) -> Vec { @@ -273,8 +311,11 @@ impl Column for columns::TransactionStatus { } } +impl ColumnName for columns::TransactionStatus { + const NAME: &'static str = TRANSACTION_STATUS_CF; +} + impl Column for columns::ShredCode { - const NAME: &'static str = CODE_SHRED_CF; type Index = (u64, u64); fn key(index: (u64, u64)) -> Vec { @@ -294,8 +335,11 @@ impl Column for columns::ShredCode { } } +impl ColumnName for columns::ShredCode { + const NAME: &'static str = CODE_SHRED_CF; +} + impl Column for columns::ShredData { - const NAME: &'static str = DATA_SHRED_CF; type Index = (u64, u64); fn key((slot, index): (u64, u64)) -> Vec { @@ -320,143 +364,59 @@ impl Column for columns::ShredData { } } -impl Column for columns::Index { - const NAME: &'static str = INDEX_CF; - type Index = u64; - - fn key(slot: Slot) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], slot); - key - } - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn slot(index: Self::Index) -> Slot { - index - } - - fn as_index(slot: Slot) -> Self::Index { - slot - } +impl ColumnName for columns::ShredData { + const NAME: &'static str = DATA_SHRED_CF; } +impl SlotColumn for columns::Index {} +impl ColumnName for columns::Index { + const NAME: &'static str = INDEX_CF; +} impl TypedColumn for columns::Index { type Type = blockstore_meta::Index; } -impl Column for columns::DeadSlots { +impl SlotColumn for columns::DeadSlots {} +impl ColumnName for columns::DeadSlots { const NAME: &'static str = DEAD_SLOTS_CF; - type Index = u64; - - fn key(slot: Slot) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], slot); - key - } - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn slot(index: Self::Index) -> Slot { - index - } - - fn as_index(slot: Slot) -> Self::Index { - slot - } } - impl TypedColumn for columns::DeadSlots { type Type = bool; } -impl Column for columns::Orphans { - const NAME: &'static str = ORPHANS_CF; - type Index = u64; - - fn key(slot: Slot) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], slot); - key - } - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn slot(index: Self::Index) -> Slot { - index - } - - fn as_index(slot: Slot) -> Self::Index { - slot - } +impl SlotColumn for columns::DuplicateSlots {} +impl ColumnName for columns::DuplicateSlots { + const NAME: &'static str = DUPLICATE_SLOTS_CF; +} +impl TypedColumn for columns::DuplicateSlots { + type Type = blockstore_meta::DuplicateSlotProof; } +impl SlotColumn for columns::Orphans {} +impl ColumnName for columns::Orphans { + const NAME: &'static str = ORPHANS_CF; +} impl TypedColumn for columns::Orphans { type Type = bool; } -impl Column for columns::Root { +impl SlotColumn for columns::Root {} +impl ColumnName for columns::Root { const NAME: &'static str = ROOT_CF; - type Index = u64; - - fn key(slot: Slot) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], slot); - key - } - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn slot(index: Self::Index) -> Slot { - index - } - - fn as_index(slot: Slot) -> Self::Index { - slot - } } - impl TypedColumn for columns::Root { type Type = bool; } -impl Column for columns::SlotMeta { +impl SlotColumn for columns::SlotMeta {} +impl ColumnName for columns::SlotMeta { const NAME: &'static str = META_CF; - type Index = u64; - - fn key(slot: Slot) -> Vec { - let mut key = vec![0; 8]; - BigEndian::write_u64(&mut key[..], slot); - key - } - - fn index(key: &[u8]) -> u64 { - BigEndian::read_u64(&key[..8]) - } - - fn slot(index: Self::Index) -> Slot { - index - } - - fn as_index(slot: Slot) -> Self::Index { - slot - } } - impl TypedColumn for columns::SlotMeta { type Type = blockstore_meta::SlotMeta; } impl Column for columns::ErasureMeta { - const NAME: &'static str = ERASURE_META_CF; type Index = (u64, u64); fn index(key: &[u8]) -> (u64, u64) { @@ -481,7 +441,9 @@ impl Column for columns::ErasureMeta { (slot, 0) } } - +impl ColumnName for columns::ErasureMeta { + const NAME: &'static str = ERASURE_META_CF; +} impl TypedColumn for columns::ErasureMeta { type Type = blockstore_meta::ErasureMeta; } @@ -524,7 +486,7 @@ impl Database { pub fn get(&self, key: C::Index) -> Result> where - C: TypedColumn, + C: TypedColumn + ColumnName, { if let Some(serialized_value) = self.backend.get_cf(self.cf_handle::(), &C::key(key))? { let value = deserialize(&serialized_value)?; @@ -540,7 +502,7 @@ impl Database { iterator_mode: IteratorMode, ) -> Result)> + 'a> where - C: Column, + C: Column + ColumnName, { let cf = self.cf_handle::(); let iter = self.backend.iterator_cf::(cf, iterator_mode)?; @@ -548,16 +510,16 @@ impl Database { } #[inline] - pub fn cf_handle(&self) -> &ColumnFamily + pub fn cf_handle(&self) -> &ColumnFamily where - C: Column, + C: Column + ColumnName, { self.backend.cf_handle(C::NAME) } pub fn column(&self) -> LedgerColumn where - C: Column, + C: Column + ColumnName, { LedgerColumn { backend: Arc::clone(&self.backend), @@ -594,7 +556,7 @@ impl Database { // its end pub fn delete_range_cf(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result where - C: Column, + C: Column + ColumnName, { let cf = self.cf_handle::(); let from_index = C::as_index(from); @@ -612,7 +574,7 @@ impl Database { impl LedgerColumn where - C: Column, + C: Column + ColumnName, { pub fn get_bytes(&self, key: C::Index) -> Result>> { self.backend.get_cf(self.handle(), &C::key(key)) @@ -634,7 +596,7 @@ where to: Option, ) -> Result where - C::Index: PartialOrd + Copy, + C::Index: PartialOrd + Copy + ColumnName, { let mut end = true; let iter_config = match from { @@ -691,7 +653,7 @@ where impl LedgerColumn where - C: TypedColumn, + C: TypedColumn + ColumnName, { pub fn get(&self, key: C::Index) -> Result> { if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? { @@ -712,19 +674,23 @@ where } impl<'a> WriteBatch<'a> { - pub fn put_bytes(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { + pub fn put_bytes(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { self.write_batch .put_cf(self.get_cf::(), &C::key(key), bytes)?; Ok(()) } - pub fn delete(&mut self, key: C::Index) -> Result<()> { + pub fn delete(&mut self, key: C::Index) -> Result<()> { self.write_batch .delete_cf(self.get_cf::(), &C::key(key))?; Ok(()) } - pub fn put(&mut self, key: C::Index, value: &C::Type) -> Result<()> { + pub fn put( + &mut self, + key: C::Index, + value: &C::Type, + ) -> Result<()> { let serialized_value = serialize(&value)?; self.write_batch .put_cf(self.get_cf::(), &C::key(key), &serialized_value)?; @@ -732,7 +698,7 @@ impl<'a> WriteBatch<'a> { } #[inline] - fn get_cf(&self) -> &'a ColumnFamily { + fn get_cf(&self) -> &'a ColumnFamily { self.map[C::NAME] } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index cfa542313..0148da0ab 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -60,6 +60,14 @@ pub struct ErasureMeta { pub config: ErasureConfig, } +#[derive(Deserialize, Serialize)] +pub struct DuplicateSlotProof { + #[serde(with = "serde_bytes")] + pub shred1: Vec, + #[serde(with = "serde_bytes")] + pub shred2: Vec, +} + #[derive(Debug, PartialEq)] pub enum ErasureMetaStatus { CanRecover, @@ -209,6 +217,12 @@ impl ErasureMeta { } } +impl DuplicateSlotProof { + pub(crate) fn new(shred1: Vec, shred2: Vec) -> Self { + DuplicateSlotProof { shred1, shred2 } + } +} + #[cfg(test)] mod test { use super::*;