Add support in BlockStore for tracking duplicate slots (#7761)

* Add test

* Add new column family to track duplicate slots

* Fix clippy errors

* Introduce new SlotColumn for common implementation of Column trait
This commit is contained in:
carllin 2020-01-13 17:21:39 -08:00 committed by GitHub
parent 156292e408
commit f91ffbbfdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 219 additions and 142 deletions

1
Cargo.lock generated
View File

@ -3907,6 +3907,7 @@ dependencies = [
"rayon 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-budget-program 0.23.0",

View File

@ -394,7 +394,9 @@ fn graph_forks(
dot.join("\n")
}
fn analyze_column<T: solana_ledger::blockstore_db::Column>(
fn analyze_column<
T: solana_ledger::blockstore_db::Column + solana_ledger::blockstore_db::ColumnName,
>(
db: &Database,
name: &str,
key_size: usize,

View File

@ -28,6 +28,7 @@ rand_chacha = "0.1.1"
rayon = "1.2.0"
reed-solomon-erasure = { package = "solana-reed-solomon-erasure", version = "4.0.1-3", features = ["simd-accel"] }
serde = "1.0.104"
serde_bytes = "0.11.3"
serde_derive = "1.0.103"
solana-client = { path = "../client", version = "0.23.0" }
solana-genesis-programs = { path = "../genesis-programs", version = "0.23.0" }

View File

@ -77,6 +77,7 @@ pub struct Blockstore {
db: Arc<Database>,
meta_cf: LedgerColumn<cf::SlotMeta>,
dead_slots_cf: LedgerColumn<cf::DeadSlots>,
duplicate_slots_cf: LedgerColumn<cf::DuplicateSlots>,
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
orphans_cf: LedgerColumn<cf::Orphans>,
index_cf: LedgerColumn<cf::Index>,
@ -179,7 +180,7 @@ impl Blockstore {
// Create the dead slots column family
let dead_slots_cf = db.column();
let duplicate_slots_cf = db.column();
let erasure_meta_cf = db.column();
// Create the orphans column family. An "orphan" is defined as
@ -208,6 +209,7 @@ impl Blockstore {
db,
meta_cf,
dead_slots_cf,
duplicate_slots_cf,
erasure_meta_cf,
orphans_cf,
index_cf,
@ -303,39 +305,43 @@ impl Blockstore {
let columns_empty = self
.db
.delete_range_cf::<cf::SlotMeta>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Root>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::ShredData>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::ShredCode>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::DeadSlots>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::DuplicateSlots>(&mut write_batch, from_slot, to_slot)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::ErasureMeta>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Orphans>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::Index>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false)
.unwrap_or(false)
& self
.db
.delete_range_cf::<cf::TransactionStatus>(&mut write_batch, from_slot, to_slot)
.unwrap_or_else(|_| false);
.unwrap_or(false);
if let Err(e) = self.db.write(write_batch) {
error!(
"Error: {:?} while submitting write batch for slot {:?} retrying...",
@ -368,6 +374,10 @@ impl Blockstore {
.dead_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.duplicate_slots_cf
.compact_range(from_slot, to_slot)
.unwrap_or(false)
&& self
.erasure_meta_cf
.compact_range(from_slot, to_slot)
@ -1626,6 +1636,37 @@ impl Blockstore {
self.dead_slots_cf.put(slot, &true)
}
pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
}
pub fn get_duplicate_slot(&self, slot: u64) -> Option<DuplicateSlotProof> {
self.duplicate_slots_cf
.get(slot)
.expect("fetch from DuplicateSlots column family failed")
}
// `new_shred` is asssumed to have slot and index equal to the given slot and index.
// Returns true if `new_shred` is not equal to the existing shred at the given
// slot and index as this implies the leader generated two different shreds with
// the same slot and index
pub fn is_shred_duplicate(&self, slot: u64, index: u64, new_shred: &[u8]) -> bool {
let res = self
.get_data_shred(slot, index)
.expect("fetch from DuplicateSlots column family failed");
res.map(|existing_shred| existing_shred != new_shred)
.unwrap_or(false)
}
pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
self.duplicate_slots_cf
.get(slot)
.expect("fetch from DuplicateSlots column family failed")
.is_some()
}
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
let mut results = vec![];
@ -2411,6 +2452,13 @@ pub mod tests {
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::DuplicateSlots>(IteratorMode::Start)
.unwrap()
.next()
.map(|(slot, _)| slot >= min_slot)
.unwrap_or(true)
& blockstore
.db
.iter::<cf::ErasureMeta>(IteratorMode::Start)
@ -5193,4 +5241,49 @@ pub mod tests {
let num_coding_in_index = index.coding().num_shreds();
assert_eq!(num_coding_in_index, num_coding);
}
#[test]
fn test_duplicate_slot() {
let slot = 0;
let entries1 = make_slot_entries_with_transactions(1);
let entries2 = make_slot_entries_with_transactions(1);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(slot, 0, 1.0, leader_keypair.clone(), 0, 0)
.expect("Failed in creating shredder");
let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0);
let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0);
let shred = shreds[0].clone();
let duplicate_shred = duplicate_shreds[0].clone();
let non_duplicate_shred = shred.clone();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore
.insert_shreds(vec![shred.clone()], None, false)
.unwrap();
// No duplicate shreds exist yet
assert!(!blockstore.has_duplicate_shreds_in_slot(slot));
// Check if shreds are duplicated
assert!(blockstore.is_shred_duplicate(slot, 0, &duplicate_shred.payload));
assert!(!blockstore.is_shred_duplicate(slot, 0, &non_duplicate_shred.payload));
// Store a duplicate shred
blockstore
.store_duplicate_slot(slot, shred.payload.clone(), duplicate_shred.payload.clone())
.unwrap();
// Slot is now marked as duplicate
assert!(blockstore.has_duplicate_shreds_in_slot(slot));
// Check ability to fetch the duplicates
let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
assert_eq!(duplicate_proof.shred1, shred.payload);
assert_eq!(duplicate_proof.shred2, duplicate_shred.payload);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
}

View File

@ -21,6 +21,10 @@ const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
const META_CF: &str = "meta";
// Column family for slots that have been marked as dead
const DEAD_SLOTS_CF: &str = "dead_slots";
// Column family for storing proof that there were multiple
// versions of a slot
const DUPLICATE_SLOTS_CF: &str = "duplicate_slots";
// Column family storing erasure metadata for a slot
const ERASURE_META_CF: &str = "erasure_meta";
// Column family for orphans data
const ORPHANS_CF: &str = "orphans";
@ -62,17 +66,21 @@ pub enum IteratorMode<Index> {
pub mod columns {
#[derive(Debug)]
/// SlotMeta Column
/// The slot metadata column
pub struct SlotMeta;
#[derive(Debug)]
/// Orphans Column
/// The orphans column
pub struct Orphans;
#[derive(Debug)]
/// Data Column
/// The dead slots column
pub struct DeadSlots;
#[derive(Debug)]
/// The duplicate slots column
pub struct DuplicateSlots;
#[derive(Debug)]
/// The erasure meta column
pub struct ErasureMeta;
@ -104,8 +112,8 @@ struct Rocks(rocksdb::DB);
impl Rocks {
fn open(path: &Path) -> Result<Rocks> {
use columns::{
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
TransactionStatus,
DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData,
SlotMeta, TransactionStatus,
};
fs::create_dir_all(&path)?;
@ -117,6 +125,8 @@ impl Rocks {
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
let duplicate_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options());
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
@ -132,6 +142,7 @@ impl Rocks {
let cfs = vec![
meta_cf_descriptor,
dead_slots_cf_descriptor,
duplicate_slots_cf_descriptor,
erasure_meta_cf_descriptor,
orphans_cf_descriptor,
root_cf_descriptor,
@ -149,13 +160,14 @@ impl Rocks {
fn columns(&self) -> Vec<&'static str> {
use columns::{
DeadSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData, SlotMeta,
TransactionStatus,
DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans, Root, ShredCode, ShredData,
SlotMeta, TransactionStatus,
};
vec![
ErasureMeta::NAME,
DeadSlots::NAME,
DuplicateSlots::NAME,
Index::NAME,
Orphans::NAME,
Root::NAME,
@ -226,7 +238,6 @@ impl Rocks {
}
pub trait Column {
const NAME: &'static str;
type Index;
fn key_size() -> usize {
@ -239,6 +250,10 @@ pub trait Column {
fn as_index(slot: Slot) -> Self::Index;
}
pub trait ColumnName {
const NAME: &'static str;
}
pub trait TypedColumn: Column {
type Type: Serialize + DeserializeOwned;
}
@ -247,8 +262,31 @@ impl TypedColumn for columns::TransactionStatus {
type Type = RpcTransactionStatus;
}
pub trait SlotColumn<Index = u64> {}
impl<T: SlotColumn> Column for T {
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: u64) -> Slot {
index
}
fn as_index(slot: Slot) -> u64 {
slot
}
}
impl Column for columns::TransactionStatus {
const NAME: &'static str = TRANSACTION_STATUS_CF;
type Index = (Slot, Signature);
fn key((slot, index): (Slot, Signature)) -> Vec<u8> {
@ -273,8 +311,11 @@ impl Column for columns::TransactionStatus {
}
}
impl ColumnName for columns::TransactionStatus {
const NAME: &'static str = TRANSACTION_STATUS_CF;
}
impl Column for columns::ShredCode {
const NAME: &'static str = CODE_SHRED_CF;
type Index = (u64, u64);
fn key(index: (u64, u64)) -> Vec<u8> {
@ -294,8 +335,11 @@ impl Column for columns::ShredCode {
}
}
impl ColumnName for columns::ShredCode {
const NAME: &'static str = CODE_SHRED_CF;
}
impl Column for columns::ShredData {
const NAME: &'static str = DATA_SHRED_CF;
type Index = (u64, u64);
fn key((slot, index): (u64, u64)) -> Vec<u8> {
@ -320,143 +364,59 @@ impl Column for columns::ShredData {
}
}
impl Column for columns::Index {
const NAME: &'static str = INDEX_CF;
type Index = u64;
fn key(slot: Slot) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
impl ColumnName for columns::ShredData {
const NAME: &'static str = DATA_SHRED_CF;
}
impl SlotColumn for columns::Index {}
impl ColumnName for columns::Index {
const NAME: &'static str = INDEX_CF;
}
impl TypedColumn for columns::Index {
type Type = blockstore_meta::Index;
}
impl Column for columns::DeadSlots {
impl SlotColumn for columns::DeadSlots {}
impl ColumnName for columns::DeadSlots {
const NAME: &'static str = DEAD_SLOTS_CF;
type Index = u64;
fn key(slot: Slot) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::DeadSlots {
type Type = bool;
}
impl Column for columns::Orphans {
const NAME: &'static str = ORPHANS_CF;
type Index = u64;
fn key(slot: Slot) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
impl SlotColumn for columns::DuplicateSlots {}
impl ColumnName for columns::DuplicateSlots {
const NAME: &'static str = DUPLICATE_SLOTS_CF;
}
impl TypedColumn for columns::DuplicateSlots {
type Type = blockstore_meta::DuplicateSlotProof;
}
impl SlotColumn for columns::Orphans {}
impl ColumnName for columns::Orphans {
const NAME: &'static str = ORPHANS_CF;
}
impl TypedColumn for columns::Orphans {
type Type = bool;
}
impl Column for columns::Root {
impl SlotColumn for columns::Root {}
impl ColumnName for columns::Root {
const NAME: &'static str = ROOT_CF;
type Index = u64;
fn key(slot: Slot) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::Root {
type Type = bool;
}
impl Column for columns::SlotMeta {
impl SlotColumn for columns::SlotMeta {}
impl ColumnName for columns::SlotMeta {
const NAME: &'static str = META_CF;
type Index = u64;
fn key(slot: Slot) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn slot(index: Self::Index) -> Slot {
index
}
fn as_index(slot: Slot) -> Self::Index {
slot
}
}
impl TypedColumn for columns::SlotMeta {
type Type = blockstore_meta::SlotMeta;
}
impl Column for columns::ErasureMeta {
const NAME: &'static str = ERASURE_META_CF;
type Index = (u64, u64);
fn index(key: &[u8]) -> (u64, u64) {
@ -481,7 +441,9 @@ impl Column for columns::ErasureMeta {
(slot, 0)
}
}
impl ColumnName for columns::ErasureMeta {
const NAME: &'static str = ERASURE_META_CF;
}
impl TypedColumn for columns::ErasureMeta {
type Type = blockstore_meta::ErasureMeta;
}
@ -524,7 +486,7 @@ impl Database {
pub fn get<C>(&self, key: C::Index) -> Result<Option<C::Type>>
where
C: TypedColumn,
C: TypedColumn + ColumnName,
{
if let Some(serialized_value) = self.backend.get_cf(self.cf_handle::<C>(), &C::key(key))? {
let value = deserialize(&serialized_value)?;
@ -540,7 +502,7 @@ impl Database {
iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + 'a>
where
C: Column,
C: Column + ColumnName,
{
let cf = self.cf_handle::<C>();
let iter = self.backend.iterator_cf::<C>(cf, iterator_mode)?;
@ -548,16 +510,16 @@ impl Database {
}
#[inline]
pub fn cf_handle<C>(&self) -> &ColumnFamily
pub fn cf_handle<C: ColumnName>(&self) -> &ColumnFamily
where
C: Column,
C: Column + ColumnName,
{
self.backend.cf_handle(C::NAME)
}
pub fn column<C>(&self) -> LedgerColumn<C>
where
C: Column,
C: Column + ColumnName,
{
LedgerColumn {
backend: Arc::clone(&self.backend),
@ -594,7 +556,7 @@ impl Database {
// its end
pub fn delete_range_cf<C>(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<bool>
where
C: Column,
C: Column + ColumnName,
{
let cf = self.cf_handle::<C>();
let from_index = C::as_index(from);
@ -612,7 +574,7 @@ impl Database {
impl<C> LedgerColumn<C>
where
C: Column,
C: Column + ColumnName,
{
pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> {
self.backend.get_cf(self.handle(), &C::key(key))
@ -634,7 +596,7 @@ where
to: Option<Slot>,
) -> Result<bool>
where
C::Index: PartialOrd + Copy,
C::Index: PartialOrd + Copy + ColumnName,
{
let mut end = true;
let iter_config = match from {
@ -691,7 +653,7 @@ where
impl<C> LedgerColumn<C>
where
C: TypedColumn,
C: TypedColumn + ColumnName,
{
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
@ -712,19 +674,23 @@ where
}
impl<'a> WriteBatch<'a> {
pub fn put_bytes<C: Column>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
pub fn put_bytes<C: Column + ColumnName>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch
.put_cf(self.get_cf::<C>(), &C::key(key), bytes)?;
Ok(())
}
pub fn delete<C: Column>(&mut self, key: C::Index) -> Result<()> {
pub fn delete<C: Column + ColumnName>(&mut self, key: C::Index) -> Result<()> {
self.write_batch
.delete_cf(self.get_cf::<C>(), &C::key(key))?;
Ok(())
}
pub fn put<C: TypedColumn>(&mut self, key: C::Index, value: &C::Type) -> Result<()> {
pub fn put<C: TypedColumn + ColumnName>(
&mut self,
key: C::Index,
value: &C::Type,
) -> Result<()> {
let serialized_value = serialize(&value)?;
self.write_batch
.put_cf(self.get_cf::<C>(), &C::key(key), &serialized_value)?;
@ -732,7 +698,7 @@ impl<'a> WriteBatch<'a> {
}
#[inline]
fn get_cf<C: Column>(&self) -> &'a ColumnFamily {
fn get_cf<C: Column + ColumnName>(&self) -> &'a ColumnFamily {
self.map[C::NAME]
}

View File

@ -60,6 +60,14 @@ pub struct ErasureMeta {
pub config: ErasureConfig,
}
#[derive(Deserialize, Serialize)]
pub struct DuplicateSlotProof {
#[serde(with = "serde_bytes")]
pub shred1: Vec<u8>,
#[serde(with = "serde_bytes")]
pub shred2: Vec<u8>,
}
#[derive(Debug, PartialEq)]
pub enum ErasureMetaStatus {
CanRecover,
@ -209,6 +217,12 @@ impl ErasureMeta {
}
}
impl DuplicateSlotProof {
pub(crate) fn new(shred1: Vec<u8>, shred2: Vec<u8>) -> Self {
DuplicateSlotProof { shred1, shred2 }
}
}
#[cfg(test)]
mod test {
use super::*;