Dont insert shred payload into rocksdb (#9366)

automerge
This commit is contained in:
anatoly yakovenko 2020-04-16 18:20:55 -07:00 committed by GitHub
parent 66abe45ea1
commit 5ed39de8c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 501 additions and 322 deletions

View File

@ -238,7 +238,7 @@ impl CrdsGossipPull {
if now > r.wallclock().checked_add(timeout).unwrap_or_else(|| 0)
|| now + timeout < r.wallclock()
{
inc_new_counter_warn!(
inc_new_counter_info!(
"cluster_info-gossip_pull_response_value_timeout",
1
);
@ -250,7 +250,7 @@ impl CrdsGossipPull {
// Before discarding this value, check if a ContactInfo for the owner
// exists in the table. If it doesn't, that implies that this value can be discarded
if crds.lookup(&CrdsValueLabel::ContactInfo(owner)).is_none() {
inc_new_counter_warn!(
inc_new_counter_info!(
"cluster_info-gossip_pull_response_value_timeout",
1
);

View File

@ -131,7 +131,6 @@ impl LedgerCleanupService {
while let Ok(new_root) = new_root_receiver.try_recv() {
root = new_root;
}
if root - *last_purge_slot > purge_interval {
let disk_utilization_pre = blockstore.storage_size();
info!(

View File

@ -354,7 +354,7 @@ impl PohRecorder {
pub fn tick(&mut self) {
let now = Instant::now();
let poh_entry = self.poh.lock().unwrap().tick();
inc_new_counter_warn!(
inc_new_counter_info!(
"poh_recorder-tick_lock_contention",
timing::duration_as_us(&now.elapsed()) as usize
);
@ -364,7 +364,7 @@ impl PohRecorder {
trace!("tick_height {}", self.tick_height);
if self.leader_first_tick_height.is_none() {
inc_new_counter_warn!(
inc_new_counter_info!(
"poh_recorder-tick_overhead",
timing::duration_as_us(&now.elapsed()) as usize
);
@ -380,7 +380,7 @@ impl PohRecorder {
self.tick_cache.push((entry, self.tick_height));
let _ = self.flush_cache(true);
}
inc_new_counter_warn!(
inc_new_counter_info!(
"poh_recorder-tick_overhead",
timing::duration_as_us(&now.elapsed()) as usize
);
@ -409,13 +409,13 @@ impl PohRecorder {
{
let now = Instant::now();
let mut poh_lock = self.poh.lock().unwrap();
inc_new_counter_warn!(
inc_new_counter_info!(
"poh_recorder-record_lock_contention",
timing::duration_as_us(&now.elapsed()) as usize
);
let now = Instant::now();
let res = poh_lock.record(mixin);
inc_new_counter_warn!(
inc_new_counter_info!(
"poh_recorder-record_ms",
timing::duration_as_us(&now.elapsed()) as usize
);

View File

@ -262,7 +262,7 @@ impl RepairService {
} else if slot_meta.consumed == slot_meta.received {
vec![RepairType::HighestShred(slot, slot_meta.received)]
} else {
let reqs = blockstore.find_missing_data_indexes(
let reqs = blockstore.find_missing_data_indexes_ts(
slot,
slot_meta.first_shred_timestamp,
slot_meta.consumed,

View File

@ -1,4 +1,5 @@
use crate::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES};
use solana_ledger::blockstore::Blockstore;
use solana_ledger::{snapshot_package::AccountsPackageReceiver, snapshot_utils};
use solana_sdk::{clock::Slot, hash::Hash};
use std::{
@ -21,6 +22,7 @@ impl SnapshotPackagerService {
starting_snapshot_hash: Option<(Slot, Hash)>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blockstore: Option<Arc<Blockstore>>,
) -> Self {
let exit = exit.clone();
let cluster_info = cluster_info.clone();
@ -63,6 +65,9 @@ impl SnapshotPackagerService {
.unwrap()
.push_snapshot_hashes(hashes.clone());
}
if let Some(ref blockstore) = blockstore {
let _ = blockstore.tar_shreds(snapshot_package.root);
}
}
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => (),

View File

@ -293,6 +293,7 @@ pub mod tests {
Blockstore::open_with_signal(&blockstore_path)
.expect("Expected to successfully open ledger");
let blockstore = Arc::new(blockstore);
let bank = bank_forks.working_bank();
let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blockstore, None);

View File

@ -198,6 +198,10 @@ impl Validator {
let bank_info = &bank_forks_info[0];
let bank = bank_forks[bank_info.bank_slot].clone();
blockstore
.reconcile_shreds(Some(&leader_schedule_cache))
.expect("Expected to successfully reconcile shreds");
info!("Starting validator from slot {}", bank.slot());
{
let hard_forks: Vec<_> = bank.hard_forks().read().unwrap().iter().copied().collect();
@ -376,8 +380,13 @@ impl Validator {
if config.snapshot_config.is_some() {
// Start a snapshot packaging service
let (sender, receiver) = channel();
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, snapshot_hash, &exit, &cluster_info);
let snapshot_packager_service = SnapshotPackagerService::new(
receiver,
snapshot_hash,
&exit,
&cluster_info,
Some(blockstore.clone()),
);
(Some(snapshot_packager_service), Some(sender))
} else {
(None, None)

View File

@ -324,7 +324,7 @@ mod tests {
)));
let snapshot_packager_service =
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info);
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None);
// Close the channel so that the package service will exit after reading all the
// packages off the channel

View File

@ -494,8 +494,6 @@ fn analyze_storage(database: &Database) -> Result<(), String> {
analyze_column::<ErasureMeta>(database, "ErasureMeta", ErasureMeta::key_size())?;
analyze_column::<Root>(database, "Root", Root::key_size())?;
analyze_column::<Index>(database, "Index", Index::key_size())?;
analyze_column::<ShredData>(database, "ShredData", ShredData::key_size())?;
analyze_column::<ShredCode>(database, "ShredCode", ShredCode::key_size())?;
analyze_column::<TransactionStatus>(
database,
"TransactionStatus",

File diff suppressed because it is too large Load Diff

View File

@ -43,7 +43,7 @@ pub struct Index {
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)]
pub struct ShredIndex {
/// Map representing presence/absence of shreds
index: BTreeSet<u64>,
pub index: BTreeSet<u64>,
}
#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]

View File

@ -206,6 +206,9 @@ fn is_valid_genesis_archive_entry(parts: &[&str], kind: tar::EntryType) -> bool
(["rocksdb"], Directory) => true,
(["rocksdb", ..], GNUSparse) => true,
(["rocksdb", ..], Regular) => true,
(["shreds", ..], Directory) => true,
(["shreds", ..], GNUSparse) => true,
(["shreds", ..], Regular) => true,
_ => false,
}
}

View File

@ -5,7 +5,7 @@ use solana_client::rpc_client::RpcClient;
use solana_client::thin_client::create_client;
use solana_core::{
broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH,
gossip_service::discover_cluster, validator::ValidatorConfig,
contact_info::ContactInfo, gossip_service::discover_cluster, validator::ValidatorConfig,
};
use solana_download_utils::download_snapshot;
use solana_ledger::bank_forks::CompressionType;
@ -274,7 +274,7 @@ fn run_cluster_partition(
for node in &cluster_nodes {
let node_client = RpcClient::new_socket(node.rpc);
if let Ok(epoch_info) = node_client.get_epoch_info() {
info!("slots_per_epoch: {:?}", epoch_info);
debug!("slots_per_epoch: {:?}", epoch_info);
if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) {
reached_epoch = false;
break;
@ -343,13 +343,16 @@ fn run_cluster_partition(
alive_node_contact_infos.len(),
)
.unwrap();
assert!(wait_for_new_roots(&alive_node_contact_infos, 1024, 16));
info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len());
info!("PARTITION_TEST looking for new roots on all nodes");
let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()];
let mut done = false;
}
pub fn wait_for_new_roots(nodes: &[ContactInfo], mut tries: usize, min_roots: usize) -> bool {
info!("looking for new roots on all nodes");
let mut roots = vec![HashSet::new(); nodes.len()];
let mut last_print = Instant::now();
while !done {
for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() {
while tries > 0 {
for (i, ingress_node) in nodes.iter().enumerate() {
let client = create_client(
ingress_node.client_facing_addr(),
solana_core::cluster_info::VALIDATOR_PORT_RANGE,
@ -358,14 +361,24 @@ fn run_cluster_partition(
roots[i].insert(slot);
let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0);
if last_print.elapsed().as_secs() > 3 {
info!("PARTITION_TEST min observed roots {}/16", min_node);
info!(
"{}: min observed roots {}/{} in {} nodes",
tries,
min_node,
min_roots,
roots.len()
);
last_print = Instant::now();
}
done = min_node >= 16;
if min_node >= min_roots {
return true;
}
}
sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2));
tries -= 1;
}
info!("PARTITION_TEST done waiting for roots");
info!("failed waiting for roots");
false
}
#[allow(unused_attributes)]
@ -863,6 +876,7 @@ fn test_snapshot_download() {
#[test]
#[serial]
fn test_snapshot_restart_tower() {
solana_logger::setup();
// First set up the cluster with 2 nodes
let snapshot_interval_slots = 10;
let num_account_paths = 2;
@ -920,12 +934,11 @@ fn test_snapshot_restart_tower() {
// Use the restarted node as the discovery point so that we get updated
// validator's ContactInfo
let restarted_node_info = cluster.get_contact_info(&validator_id).unwrap();
cluster_tests::spend_and_verify_all_nodes(
&restarted_node_info,
&cluster.funding_keypair,
1,
HashSet::new(),
);
let (cluster_nodes, _) =
discover_cluster(&restarted_node_info.gossip, cluster.validators.len()).unwrap();
assert!(wait_for_new_roots(&cluster_nodes, 512, 16));
}
#[test]