encapsulate meta_cf (#2335)
This commit is contained in:
parent
08924ea36a
commit
bafd90807d
|
@ -20,7 +20,7 @@ use crate::crds_gossip::CrdsGossip;
|
||||||
use crate::crds_gossip_error::CrdsGossipError;
|
use crate::crds_gossip_error::CrdsGossipError;
|
||||||
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
|
||||||
use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId};
|
use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId};
|
||||||
use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT};
|
use crate::db_ledger::DbLedger;
|
||||||
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
|
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
|
||||||
use crate::result::Result;
|
use crate::result::Result;
|
||||||
use crate::rpc::RPC_PORT;
|
use crate::rpc::RPC_PORT;
|
||||||
|
@ -871,7 +871,7 @@ impl ClusterInfo {
|
||||||
ix: u64,
|
ix: u64,
|
||||||
) -> Vec<SharedBlob> {
|
) -> Vec<SharedBlob> {
|
||||||
if let Some(db_ledger) = db_ledger {
|
if let Some(db_ledger) = db_ledger {
|
||||||
let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT));
|
let meta = db_ledger.meta();
|
||||||
|
|
||||||
if let Ok(Some(meta)) = meta {
|
if let Ok(Some(meta)) = meta {
|
||||||
let max_slot = meta.received_slot;
|
let max_slot = meta.received_slot;
|
||||||
|
|
|
@ -275,7 +275,7 @@ impl LedgerColumnFamilyRaw for ErasureCf {
|
||||||
pub struct DbLedger {
|
pub struct DbLedger {
|
||||||
// Underlying database is automatically closed in the Drop implementation of DB
|
// Underlying database is automatically closed in the Drop implementation of DB
|
||||||
db: Arc<DB>,
|
db: Arc<DB>,
|
||||||
pub meta_cf: MetaCf,
|
meta_cf: MetaCf,
|
||||||
pub data_cf: DataCf,
|
pub data_cf: DataCf,
|
||||||
pub erasure_cf: ErasureCf,
|
pub erasure_cf: ErasureCf,
|
||||||
}
|
}
|
||||||
|
@ -330,6 +330,10 @@ impl DbLedger {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn meta(&self) -> Result<Option<SlotMeta>> {
|
||||||
|
self.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn destroy(ledger_path: &str) -> Result<()> {
|
pub fn destroy(ledger_path: &str) -> Result<()> {
|
||||||
// DB::destroy() fails if `ledger_path` doesn't exist
|
// DB::destroy() fails if `ledger_path` doesn't exist
|
||||||
create_dir_all(&ledger_path)?;
|
create_dir_all(&ledger_path)?;
|
||||||
|
|
|
@ -32,7 +32,7 @@ pub fn repair(
|
||||||
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
|
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
|
||||||
let rcluster_info = cluster_info.read().unwrap();
|
let rcluster_info = cluster_info.read().unwrap();
|
||||||
let mut is_next_leader = false;
|
let mut is_next_leader = false;
|
||||||
let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
|
let meta = db_ledger.meta()?;
|
||||||
if meta.is_none() {
|
if meta.is_none() {
|
||||||
return Ok(vec![]);
|
return Ok(vec![]);
|
||||||
}
|
}
|
||||||
|
@ -325,8 +325,7 @@ pub fn process_blob(
|
||||||
// then stop
|
// then stop
|
||||||
if max_ix != 0 && !consumed_entries.is_empty() {
|
if max_ix != 0 && !consumed_entries.is_empty() {
|
||||||
let meta = db_ledger
|
let meta = db_ledger
|
||||||
.meta_cf
|
.meta()?
|
||||||
.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?
|
|
||||||
.expect("Expect metadata to exist if consumed entries is nonzero");
|
.expect("Expect metadata to exist if consumed entries is nonzero");
|
||||||
|
|
||||||
let consumed = meta.consumed;
|
let consumed = meta.consumed;
|
||||||
|
@ -367,7 +366,7 @@ pub fn calculate_max_repair_entry_height(
|
||||||
|
|
||||||
#[cfg(feature = "erasure")]
|
#[cfg(feature = "erasure")]
|
||||||
fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Result<()> {
|
fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Result<()> {
|
||||||
let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
|
let meta = db_ledger.meta()?;
|
||||||
|
|
||||||
if let Some(meta) = meta {
|
if let Some(meta) = meta {
|
||||||
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
|
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
//!
|
//!
|
||||||
use crate::cluster_info::ClusterInfo;
|
use crate::cluster_info::ClusterInfo;
|
||||||
use crate::counter::Counter;
|
use crate::counter::Counter;
|
||||||
use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT};
|
use crate::db_ledger::DbLedger;
|
||||||
use crate::db_window::*;
|
use crate::db_window::*;
|
||||||
use crate::entry::EntrySender;
|
use crate::entry::EntrySender;
|
||||||
|
|
||||||
|
@ -158,7 +158,7 @@ pub fn window_service(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT));
|
let meta = db_ledger.meta();
|
||||||
|
|
||||||
if let Ok(Some(meta)) = meta {
|
if let Ok(Some(meta)) = meta {
|
||||||
let received = meta.received;
|
let received = meta.received;
|
||||||
|
|
Loading…
Reference in New Issue