Revert shred fs (#9712)
* Revert "Untar is called for shred archives that do not exist. (#9565)" This reverts commit729cb5eec6
. * Revert "Dont insert shred payload into rocksdb (#9366)" This reverts commit5ed39de8c5
.
This commit is contained in:
parent
50f1ec0374
commit
fa20963b93
|
@ -238,7 +238,7 @@ impl CrdsGossipPull {
|
|||
if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
|
||||
|| now + timeout < r.wallclock()
|
||||
{
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"cluster_info-gossip_pull_response_value_timeout",
|
||||
1
|
||||
);
|
||||
|
@ -250,7 +250,7 @@ impl CrdsGossipPull {
|
|||
// Before discarding this value, check if a ContactInfo for the owner
|
||||
// exists in the table. If it doesn't, that implies that this value can be discarded
|
||||
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"cluster_info-gossip_pull_response_value_timeout",
|
||||
1
|
||||
);
|
||||
|
|
|
@ -131,6 +131,7 @@ impl LedgerCleanupService {
|
|||
while let Ok(new_root) = new_root_receiver.try_recv() {
|
||||
root = new_root;
|
||||
}
|
||||
|
||||
if root - *last_purge_slot > purge_interval {
|
||||
let disk_utilization_pre = blockstore.storage_size();
|
||||
info!(
|
||||
|
|
|
@ -354,7 +354,7 @@ impl PohRecorder {
|
|||
pub fn tick(&mut self) {
|
||||
let now = Instant::now();
|
||||
let poh_entry = self.poh.lock().unwrap().tick();
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"poh_recorder-tick_lock_contention",
|
||||
timing::duration_as_us(&now.elapsed()) as usize
|
||||
);
|
||||
|
@ -364,7 +364,7 @@ impl PohRecorder {
|
|||
trace!("tick_height {}", self.tick_height);
|
||||
|
||||
if self.leader_first_tick_height.is_none() {
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"poh_recorder-tick_overhead",
|
||||
timing::duration_as_us(&now.elapsed()) as usize
|
||||
);
|
||||
|
@ -380,7 +380,7 @@ impl PohRecorder {
|
|||
self.tick_cache.push((entry, self.tick_height));
|
||||
let _ = self.flush_cache(true);
|
||||
}
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"poh_recorder-tick_overhead",
|
||||
timing::duration_as_us(&now.elapsed()) as usize
|
||||
);
|
||||
|
@ -409,13 +409,13 @@ impl PohRecorder {
|
|||
{
|
||||
let now = Instant::now();
|
||||
let mut poh_lock = self.poh.lock().unwrap();
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"poh_recorder-record_lock_contention",
|
||||
timing::duration_as_us(&now.elapsed()) as usize
|
||||
);
|
||||
let now = Instant::now();
|
||||
let res = poh_lock.record(mixin);
|
||||
inc_new_counter_info!(
|
||||
inc_new_counter_warn!(
|
||||
"poh_recorder-record_ms",
|
||||
timing::duration_as_us(&now.elapsed()) as usize
|
||||
);
|
||||
|
|
|
@ -262,7 +262,7 @@ impl RepairService {
|
|||
} else if slot_meta.consumed == slot_meta.received {
|
||||
vec![RepairType::HighestShred(slot, slot_meta.received)]
|
||||
} else {
|
||||
let reqs = blockstore.find_missing_data_indexes_ts(
|
||||
let reqs = blockstore.find_missing_data_indexes(
|
||||
slot,
|
||||
slot_meta.first_shred_timestamp,
|
||||
slot_meta.consumed,
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
|
||||
use solana_ledger::blockstore::Blockstore;
|
||||
use solana_ledger::{snapshot_package::AccountsPackageReceiver, snapshot_utils};
|
||||
use solana_sdk::{clock::Slot, hash::Hash};
|
||||
use std::{
|
||||
|
@ -22,7 +21,6 @@ impl SnapshotPackagerService {
|
|||
starting_snapshot_hash: Option<(Slot, Hash)>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
cluster_info: &Arc<ClusterInfo>,
|
||||
blockstore: Option<Arc<Blockstore>>,
|
||||
) -> Self {
|
||||
let exit = exit.clone();
|
||||
let cluster_info = cluster_info.clone();
|
||||
|
@ -59,9 +57,6 @@ impl SnapshotPackagerService {
|
|||
}
|
||||
cluster_info.push_snapshot_hashes(hashes.clone());
|
||||
}
|
||||
if let Some(ref blockstore) = blockstore {
|
||||
let _ = blockstore.tar_shreds(snapshot_package.root);
|
||||
}
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
Err(RecvTimeoutError::Timeout) => (),
|
||||
|
|
|
@ -291,7 +291,6 @@ pub mod tests {
|
|||
Blockstore::open_with_signal(&blockstore_path)
|
||||
.expect("Expected to successfully open ledger");
|
||||
let blockstore = Arc::new(blockstore);
|
||||
|
||||
let bank = bank_forks.working_bank();
|
||||
let (exit, poh_recorder, poh_service, _entry_receiver) =
|
||||
create_test_recorder(&bank, &blockstore, None);
|
||||
|
|
|
@ -198,10 +198,6 @@ impl Validator {
|
|||
let bank_info = &bank_forks_info[0];
|
||||
let bank = bank_forks[bank_info.bank_slot].clone();
|
||||
|
||||
blockstore
|
||||
.reconcile_shreds(Some(&leader_schedule_cache))
|
||||
.expect("Expected to successfully reconcile shreds");
|
||||
|
||||
info!("Starting validator from slot {}", bank.slot());
|
||||
{
|
||||
let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect();
|
||||
|
@ -376,13 +372,8 @@ impl Validator {
|
|||
if config.snapshot_config.is_some() {
|
||||
// Start a snapshot packaging service
|
||||
let (sender, receiver) = channel();
|
||||
let snapshot_packager_service = SnapshotPackagerService::new(
|
||||
receiver,
|
||||
snapshot_hash,
|
||||
&exit,
|
||||
&cluster_info,
|
||||
Some(blockstore.clone()),
|
||||
);
|
||||
let snapshot_packager_service =
|
||||
SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info);
|
||||
(Some(snapshot_packager_service), Some(sender))
|
||||
} else {
|
||||
(None, None)
|
||||
|
|
|
@ -321,7 +321,7 @@ mod tests {
|
|||
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
|
||||
|
||||
let snapshot_packager_service =
|
||||
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None);
|
||||
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info);
|
||||
|
||||
// Close the channel so that the package service will exit after reading all the
|
||||
// packages off the channel
|
||||
|
|
|
@ -494,6 +494,8 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
|
|||
analyze_column::<ErasureMeta>(database, "ErasureMeta", ErasureMeta::key_size())?;
|
||||
analyze_column::<Root>(database, "Root", Root::key_size())?;
|
||||
analyze_column::<Index>(database, "Index", Index::key_size())?;
|
||||
analyze_column::<ShredData>(database, "ShredData", ShredData::key_size())?;
|
||||
analyze_column::<ShredCode>(database, "ShredCode", ShredCode::key_size())?;
|
||||
analyze_column::<TransactionStatus>(
|
||||
database,
|
||||
"TransactionStatus",
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -43,7 +43,7 @@ pub struct Index {
|
|||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
|
||||
pub struct ShredIndex {
|
||||
/// Map representing presence/absence of shreds
|
||||
pub index: BTreeSet<u64>,
|
||||
index: BTreeSet<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
|
||||
|
|
|
@ -206,9 +206,6 @@ fn is_valid_genesis_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool
|
|||
(["rocksdb"], Directory) => true,
|
||||
(["rocksdb", ..], GNUSparse) => true,
|
||||
(["rocksdb", ..], Regular) => true,
|
||||
(["shreds", ..], Directory) => true,
|
||||
(["shreds", ..], GNUSparse) => true,
|
||||
(["shreds", ..], Regular) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use solana_client::rpc_client::RpcClient;
|
|||
use solana_client::thin_client::create_client;
|
||||
use solana_core::{
|
||||
broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH,
|
||||
contact_info::ContactInfo, gossip_service::discover_cluster, validator::ValidatorConfig,
|
||||
gossip_service::discover_cluster, validator::ValidatorConfig,
|
||||
};
|
||||
use solana_download_utils::download_snapshot;
|
||||
use solana_ledger::bank_forks::CompressionType;
|
||||
|
@ -274,7 +274,7 @@ fn run_cluster_partition(
|
|||
for node in &cluster_nodes {
|
||||
let node_client = RpcClient::new_socket(node.rpc);
|
||||
if let Ok(epoch_info) = node_client.get_epoch_info() {
|
||||
debug!("slots_per_epoch: {:?}", epoch_info);
|
||||
info!("slots_per_epoch: {:?}", epoch_info);
|
||||
if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) {
|
||||
reached_epoch = false;
|
||||
break;
|
||||
|
@ -343,16 +343,13 @@ fn run_cluster_partition(
|
|||
alive_node_contact_infos.len(),
|
||||
)
|
||||
.unwrap();
|
||||
assert!(wait_for_new_roots(&alive_node_contact_infos, 1024, 16));
|
||||
info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len());
|
||||
}
|
||||
|
||||
pub fn wait_for_new_roots(nodes: &[ContactInfo], mut tries: usize, min_roots: usize) -> bool {
|
||||
info!("looking for new roots on all nodes");
|
||||
let mut roots = vec![HashSet::new(); nodes.len()];
|
||||
info!("PARTITION_TEST looking for new roots on all nodes");
|
||||
let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()];
|
||||
let mut done = false;
|
||||
let mut last_print = Instant::now();
|
||||
while tries > 0 {
|
||||
for (i, ingress_node) in nodes.iter().enumerate() {
|
||||
while !done {
|
||||
for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() {
|
||||
let client = create_client(
|
||||
ingress_node.client_facing_addr(),
|
||||
solana_core::cluster_info::VALIDATOR_PORT_RANGE,
|
||||
|
@ -361,24 +358,14 @@ pub fn wait_for_new_roots(nodes: &[ContactInfo], mut tries: usize, min_roots: us
|
|||
roots[i].insert(slot);
|
||||
let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0);
|
||||
if last_print.elapsed().as_secs() > 3 {
|
||||
info!(
|
||||
"{}: min observed roots {}/{} in {} nodes",
|
||||
tries,
|
||||
min_node,
|
||||
min_roots,
|
||||
roots.len()
|
||||
);
|
||||
info!("PARTITION_TEST min observed roots {}/16", min_node);
|
||||
last_print = Instant::now();
|
||||
}
|
||||
if min_node >= min_roots {
|
||||
return true;
|
||||
}
|
||||
done = min_node >= 16;
|
||||
}
|
||||
sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2));
|
||||
tries -= 1;
|
||||
}
|
||||
info!("failed waiting for roots");
|
||||
false
|
||||
info!("PARTITION_TEST done waiting for roots");
|
||||
}
|
||||
|
||||
#[allow(unused_attributes)]
|
||||
|
@ -876,7 +863,6 @@ fn test_snapshot_download() {
|
|||
#[test]
|
||||
#[serial]
|
||||
fn test_snapshot_restart_tower() {
|
||||
solana_logger::setup();
|
||||
// First set up the cluster with 2 nodes
|
||||
let snapshot_interval_slots = 10;
|
||||
let num_account_paths = 2;
|
||||
|
@ -934,11 +920,12 @@ fn test_snapshot_restart_tower() {
|
|||
// Use the restarted node as the discovery point so that we get updated
|
||||
// validator's ContactInfo
|
||||
let restarted_node_info = cluster.get_contact_info(&validator_id).unwrap();
|
||||
|
||||
let (cluster_nodes, _) =
|
||||
discover_cluster(&restarted_node_info.gossip, cluster.validators.len()).unwrap();
|
||||
|
||||
assert!(wait_for_new_roots(&cluster_nodes, 512, 16));
|
||||
cluster_tests::spend_and_verify_all_nodes(
|
||||
&restarted_node_info,
|
||||
&cluster.funding_keypair,
|
||||
1,
|
||||
HashSet::new(),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue