Avoid full-range compactions with periodic filtered b.g. ones (#16697)

* Update rocksdb to v0.16.0

* Promote the infrequent and important log to info!

* Force background compaction by ttl without manual compaction

* Fix test

* Support no compaction mode in test_ledger_cleanup_compaction

* Fix comment

* Make compaction_interval customizable

* Avoid major compaction with periodic filtering...

* Adress lazy_static, special cfs and range check

* Clean up a bit and add comment

* Add comment

* More comments...

* Config code cleanup

* Add comment

* Use .conflicts_with()

* Nullify unneeded delete_range ops for special CFs

* Some clean ups

* Clarify the locking intention

* Ensure special CFs' consistency with PurgeType::CompactionFilter

* Fix comment

* Fix bad copy paste

* Fix various types...

* Don't use tuples

* Add a unit test for compaction_filter

* Fix typo...

* Remove flag and just use new behavior always

* Fix wrong condition negation...

* Doc. about no set_last_purged_slot in purge_slots

* Write a test and fix off-by-one bug....

* Apply suggestions from code review

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>

* Follow up to github review suggestions

* Fix line-wrapping

* Fix conflict

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
Ryo Onodera 2021-05-28 16:42:56 +09:00 committed by GitHub
parent 5787ac813f
commit 1f97b2365f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 644 additions and 137 deletions

98
Cargo.lock generated
View File

@ -261,26 +261,21 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.54.0"
version = "0.57.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66c0bb6167449588ff70803f4127f0684f9063097eca5016f37eb52b92c2cf36"
checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d"
dependencies = [
"bitflags",
"cexpr",
"cfg-if 0.1.10",
"clang-sys",
"clap",
"env_logger 0.7.1",
"lazy_static",
"lazycell",
"log 0.4.11",
"peeking_take_while",
"proc-macro2 1.0.24",
"quote 1.0.6",
"regex",
"rustc-hash",
"shlex",
"which",
]
[[package]]
@ -583,13 +578,13 @@ dependencies = [
[[package]]
name = "clang-sys"
version = "0.29.3"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a"
checksum = "853eda514c284c2287f4bf20ae614f8781f40a81d32ecda6e91449304dfe077c"
dependencies = [
"glob",
"libc",
"libloading 0.5.2",
"libloading 0.7.0",
]
[[package]]
@ -1198,19 +1193,6 @@ dependencies = [
"syn 1.0.60",
]
[[package]]
name = "env_logger"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36"
dependencies = [
"atty",
"humantime 1.3.0",
"log 0.4.11",
"regex",
"termcolor",
]
[[package]]
name = "env_logger"
version = "0.8.3"
@ -1218,7 +1200,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
dependencies = [
"atty",
"humantime 2.0.1",
"humantime",
"log 0.4.11",
"regex",
"termcolor",
@ -1778,15 +1760,6 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47"
[[package]]
name = "humantime"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
dependencies = [
"quick-error",
]
[[package]]
name = "humantime"
version = "2.0.1"
@ -2184,16 +2157,6 @@ version = "0.2.81"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb"
[[package]]
name = "libloading"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753"
dependencies = [
"cc",
"winapi 0.3.8",
]
[[package]]
name = "libloading"
version = "0.6.2"
@ -2204,10 +2167,20 @@ dependencies = [
]
[[package]]
name = "librocksdb-sys"
version = "6.11.4"
name = "libloading"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb5b56f651c204634b936be2f92dbb42c36867e00ff7fe2405591f3b9fa66f09"
checksum = "6f84d96438c15fcd6c3f244c8fce01d1e2b9c6b5623e9c711dc9286d8fc92d6a"
dependencies = [
"cfg-if 1.0.0",
"winapi 0.3.8",
]
[[package]]
name = "librocksdb-sys"
version = "6.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9"
dependencies = [
"bindgen",
"cc",
@ -3059,12 +3032,6 @@ dependencies = [
"percent-encoding 2.1.0",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "0.6.13"
@ -3461,9 +3428,9 @@ dependencies = [
[[package]]
name = "rocksdb"
version = "0.15.0"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d83c02c429044d58474eaf5ae31e062d0de894e21125b47437ec0edc1397e6"
checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3"
dependencies = [
"libc",
"librocksdb-sys",
@ -4180,7 +4147,7 @@ dependencies = [
"criterion-stats",
"ctrlc",
"dirs-next",
"humantime 2.0.1",
"humantime",
"indicatif",
"log 0.4.11",
"num-traits",
@ -4234,7 +4201,7 @@ dependencies = [
"base64 0.13.0",
"chrono",
"console 0.14.1",
"humantime 2.0.1",
"humantime",
"indicatif",
"serde",
"serde_derive",
@ -4811,7 +4778,7 @@ version = "1.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fa7bddd7b89c26c6e3ef4af9b47d6bc8d60888559affb5160f5ade18c0cd058"
dependencies = [
"env_logger 0.8.3",
"env_logger",
"lazy_static",
"log 0.4.11",
]
@ -4820,7 +4787,7 @@ dependencies = [
name = "solana-logger"
version = "1.8.0"
dependencies = [
"env_logger 0.8.3",
"env_logger",
"lazy_static",
"log 0.4.11",
]
@ -4861,7 +4828,7 @@ dependencies = [
name = "solana-metrics"
version = "1.8.0"
dependencies = [
"env_logger 0.8.3",
"env_logger",
"gethostname",
"lazy_static",
"log 0.4.11",
@ -5555,7 +5522,7 @@ name = "solana-watchtower"
version = "1.8.0"
dependencies = [
"clap",
"humantime 2.0.1",
"humantime",
"log 0.4.11",
"solana-clap-utils",
"solana-cli-config",
@ -5803,7 +5770,7 @@ dependencies = [
"anyhow",
"fnv",
"futures 0.3.8",
"humantime 2.0.1",
"humantime",
"log 0.4.11",
"pin-project 1.0.1",
"rand 0.7.3",
@ -6849,15 +6816,6 @@ dependencies = [
"tokio-tls",
]
[[package]]
name = "which"
version = "3.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d011071ae14a2f6671d0b74080ae0cd8ebf3a6f8c9589a2cd45f23126fe29724"
dependencies = [
"libc",
]
[[package]]
name = "winapi"
version = "0.2.8"

View File

@ -207,11 +207,25 @@ impl LedgerCleanupService {
);
let mut purge_time = Measure::start("purge_slots");
blockstore.purge_slots(
purge_first_slot,
lowest_cleanup_slot,
PurgeType::PrimaryIndex,
PurgeType::CompactionFilter,
);
// Update only after purge operation.
// Safety: This value can be used by compaction_filters shared via Arc<AtomicU64>.
// Compactions are async and run as a multi-threaded background job. However, this
// shouldn't cause consistency issues for iterators and getters because we have
// already expired all affected keys (older than or equal to lowest_cleanup_slot)
// by the above `purge_slots`. According to the general RocksDB design where SST
// files are immutable, even running iterators aren't affected; the database grabs
// a snapshot of the live set of sst files at iterator's creation.
// Also, we passed the PurgeType::CompactionFilter, meaning no delete_range for
// transaction_status and address_signatures CFs. These are fine because they
// don't require strong consistent view for their operation.
blockstore.set_max_expired_slot(lowest_cleanup_slot);
purge_time.stop();
info!("{}", purge_time);

View File

@ -1636,9 +1636,11 @@ mod tests {
}
drop(blockstore);
// this purges and compacts all slots greater than or equal to 5
backup_and_clear_blockstore(&blockstore_path, 5, 2);
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// assert that slots less than 5 aren't affected
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
for i in 5..10 {
assert!(blockstore

View File

@ -39,6 +39,8 @@ mod tests {
pub cleanup_blockstore: bool,
pub emit_cpu_info: bool,
pub assert_compaction: bool,
pub compaction_interval: Option<u64>,
pub no_compaction: bool,
}
#[derive(Clone, Copy, Debug)]
@ -154,6 +156,11 @@ mod tests {
let emit_cpu_info = read_env("EMIT_CPU_INFO", true);
// set default to `true` once compaction is merged
let assert_compaction = read_env("ASSERT_COMPACTION", false);
let compaction_interval = match read_env("COMPACTION_INTERVAL", 0) {
maybe_zero if maybe_zero == 0 => None,
non_zero => Some(non_zero),
};
let no_compaction = read_env("NO_COMPACTION", false);
BenchmarkConfig {
benchmark_slots,
@ -166,6 +173,8 @@ mod tests {
cleanup_blockstore,
emit_cpu_info,
assert_compaction,
compaction_interval,
no_compaction,
}
}
@ -211,8 +220,13 @@ mod tests {
fn test_ledger_cleanup_compaction() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
let config = get_benchmark_config();
if config.no_compaction {
blockstore.set_no_compaction(true);
}
let blockstore = Arc::new(blockstore);
eprintln!("BENCHMARK CONFIG: {:?}", config);
eprintln!("LEDGER_PATH: {:?}", &blockstore_path);
@ -223,6 +237,8 @@ mod tests {
let stop_size_bytes = config.stop_size_bytes;
let stop_size_iterations = config.stop_size_iterations;
let pre_generate_data = config.pre_generate_data;
let compaction_interval = config.compaction_interval;
let batches = benchmark_slots / batch_size;
let (sender, receiver) = channel();
@ -232,7 +248,7 @@ mod tests {
blockstore.clone(),
max_ledger_shreds,
&exit,
None,
compaction_interval,
None,
);

View File

@ -59,7 +59,7 @@ trees = "0.2.1"
[dependencies.rocksdb]
# Avoid the vendored bzip2 within rocksdb-sys that can cause linker conflicts
# when also using the bzip2 crate
version = "0.15.0"
version = "0.16.0"
default-features = false
features = ["lz4"]

View File

@ -54,7 +54,7 @@ use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock,
Arc, Mutex, RwLock, RwLockWriteGuard,
},
};
use thiserror::Error;
@ -92,6 +92,7 @@ type CompletedRanges = Vec<(u32, u32)>;
pub enum PurgeType {
Exact,
PrimaryIndex,
CompactionFilter,
}
#[derive(Error, Debug)]
@ -144,7 +145,7 @@ pub struct Blockstore {
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
pub lowest_cleanup_slot: Arc<RwLock<u64>>,
pub lowest_cleanup_slot: Arc<RwLock<Slot>>,
no_compaction: bool,
}
@ -1956,18 +1957,24 @@ impl Blockstore {
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None)
} else {
let result = if index0.frozen && to_slot > index0.max_slot {
debug!("Pruning transaction index 0 at slot {}", index0.max_slot);
let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot {
info!(
"Pruning expired primary index 0 up to slot {} (max requested: {})",
index0.max_slot, to_slot
);
Some(0)
} else if index1.frozen && to_slot > index1.max_slot {
debug!("Pruning transaction index 1 at slot {}", index1.max_slot);
info!(
"Pruning expired primary index 1 up to slot {} (max requested: {})",
index1.max_slot, to_slot
);
Some(1)
} else {
None
};
if result.is_some() {
*w_active_transaction_status_index = if index0.frozen { 0 } else { 1 };
if let Some(purge_target_primary_index) = purge_target_primary_index {
*w_active_transaction_status_index = purge_target_primary_index;
if index0.frozen {
index0.max_slot = 0
};
@ -1980,16 +1987,17 @@ impl Blockstore {
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
}
Ok(result)
Ok(purge_target_primary_index)
}
}
fn get_primary_index(
fn get_primary_index_to_write(
&self,
slot: Slot,
w_active_transaction_status_index: &mut u64,
// take WriteGuard to require critical section semantics at call site
w_active_transaction_status_index: &RwLockWriteGuard<Slot>,
) -> Result<u64> {
let i = *w_active_transaction_status_index;
let i = **w_active_transaction_status_index;
let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap();
if slot > index_meta.max_slot {
assert!(!index_meta.frozen);
@ -2028,9 +2036,10 @@ impl Blockstore {
let status = status.into();
// This write lock prevents interleaving issues with the transaction_status_index_cf by gating
// writes to that column
let mut w_active_transaction_status_index =
let w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
let primary_index = self.get_primary_index(slot, &mut w_active_transaction_status_index)?;
let primary_index =
self.get_primary_index_to_write(slot, &w_active_transaction_status_index)?;
self.transaction_status_cf
.put_protobuf((primary_index, signature, slot), &status)?;
for address in writable_keys {
@ -2048,6 +2057,21 @@ impl Blockstore {
Ok(())
}
fn ensure_lowest_cleanup_slot(&self) -> (std::sync::RwLockReadGuard<Slot>, Slot) {
// Ensures consistent result by using lowest_cleanup_slot as the lower bound
// for reading columns that do not employ strong read consistency with slot-based
// delete_range
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
let lowest_available_slot = (*lowest_cleanup_slot)
.checked_add(1)
.expect("overflow from trusted value");
// Make caller hold this lock properly; otherwise LedgerCleanupService can purge/compact
// needed slots here at any given moment.
// Blockstore callers, like rpc, can process concurrent read queries
(lowest_cleanup_slot, lowest_available_slot)
}
// Returns a transaction status, as well as a loop counter for unit testing
fn get_transaction_status_with_counter(
&self,
@ -2055,9 +2079,15 @@ impl Blockstore {
confirmed_unrooted_slots: &[Slot],
) -> Result<(Option<(Slot, TransactionStatusMeta)>, u64)> {
let mut counter = 0;
let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot();
for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.transaction_status_cf.iter(IteratorMode::From(
(transaction_status_cf_primary_index, signature, 0),
(
transaction_status_cf_primary_index,
signature,
lowest_available_slot,
),
IteratorDirection::Forward,
))?;
for ((i, sig, slot), _data) in index_iterator {
@ -2076,6 +2106,8 @@ impl Blockstore {
return Ok((status, counter));
}
}
drop(lock);
Ok((None, counter))
}
@ -2199,13 +2231,15 @@ impl Blockstore {
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<(Slot, Signature)>> {
let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot();
let mut signatures: Vec<(Slot, Signature)> = vec![];
for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(
transaction_status_cf_primary_index,
pubkey,
start_slot,
start_slot.max(lowest_available_slot),
Signature::default(),
),
IteratorDirection::Forward,
@ -2220,6 +2254,7 @@ impl Blockstore {
}
}
}
drop(lock);
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures)
}
@ -2232,13 +2267,14 @@ impl Blockstore {
pubkey: Pubkey,
slot: Slot,
) -> Result<Vec<(Slot, Signature)>> {
let (lock, lowest_available_slot) = self.ensure_lowest_cleanup_slot();
let mut signatures: Vec<(Slot, Signature)> = vec![];
for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(
transaction_status_cf_primary_index,
pubkey,
slot,
slot.max(lowest_available_slot),
Signature::default(),
),
IteratorDirection::Forward,
@ -2253,6 +2289,7 @@ impl Blockstore {
signatures.push((slot, signature));
}
}
drop(lock);
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures)
}
@ -6676,6 +6713,176 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
fn do_test_lowest_cleanup_slot_and_special_cfs(
simulate_compaction: bool,
simulate_ledger_cleanup_service: bool,
) {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// TransactionStatus column opens initialized with one entry at index 2
let transaction_status_cf = blockstore.db.column::<cf::TransactionStatus>();
let pre_balances_vec = vec![1, 2, 3];
let post_balances_vec = vec![3, 2, 1];
let status = TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Ok(()),
fee: 42u64,
pre_balances: pre_balances_vec,
post_balances: post_balances_vec,
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
rewards: Some(vec![]),
}
.into();
let signature1 = Signature::new(&[2u8; 64]);
let signature2 = Signature::new(&[3u8; 64]);
// Insert rooted slots 0..=3 with no fork
let meta0 = SlotMeta::new(0, 0);
blockstore.meta_cf.put(0, &meta0).unwrap();
let meta1 = SlotMeta::new(1, 0);
blockstore.meta_cf.put(1, &meta1).unwrap();
let meta2 = SlotMeta::new(2, 1);
blockstore.meta_cf.put(2, &meta2).unwrap();
let meta3 = SlotMeta::new(3, 2);
blockstore.meta_cf.put(3, &meta3).unwrap();
blockstore.set_roots(&[0, 1, 2, 3]).unwrap();
let lowest_cleanup_slot = 1;
let lowest_available_slot = lowest_cleanup_slot + 1;
transaction_status_cf
.put_protobuf((0, signature1, lowest_cleanup_slot), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature2, lowest_available_slot), &status)
.unwrap();
let address0 = solana_sdk::pubkey::new_rand();
let address1 = solana_sdk::pubkey::new_rand();
blockstore
.write_transaction_status(
lowest_cleanup_slot,
signature1,
vec![&address0],
vec![],
TransactionStatusMeta::default(),
)
.unwrap();
blockstore
.write_transaction_status(
lowest_available_slot,
signature2,
vec![&address1],
vec![],
TransactionStatusMeta::default(),
)
.unwrap();
let check_for_missing = || {
(
blockstore
.get_transaction_status_with_counter(signature1, &[])
.unwrap()
.0
.is_none(),
blockstore
.find_address_signatures_for_slot(address0, lowest_cleanup_slot)
.unwrap()
.is_empty(),
blockstore
.find_address_signatures(address0, lowest_cleanup_slot, lowest_cleanup_slot)
.unwrap()
.is_empty(),
)
};
let assert_existing_always = || {
let are_existing_always = (
blockstore
.get_transaction_status_with_counter(signature2, &[])
.unwrap()
.0
.is_some(),
!blockstore
.find_address_signatures_for_slot(address1, lowest_available_slot)
.unwrap()
.is_empty(),
!blockstore
.find_address_signatures(
address1,
lowest_available_slot,
lowest_available_slot,
)
.unwrap()
.is_empty(),
);
assert_eq!(are_existing_always, (true, true, true));
};
let are_missing = check_for_missing();
// should never be missing before the conditional compaction & simulation...
assert_eq!(are_missing, (false, false, false));
assert_existing_always();
if simulate_compaction {
blockstore.set_max_expired_slot(lowest_cleanup_slot);
// force compaction filters to run across whole key range.
blockstore
.compact_storage(Slot::min_value(), Slot::max_value())
.unwrap();
}
if simulate_ledger_cleanup_service {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
}
let are_missing = check_for_missing();
if simulate_compaction || simulate_ledger_cleanup_service {
// ... when either simulation (or both) is effective, we should observe to be missing
// consistently
assert_eq!(are_missing, (true, true, true));
} else {
// ... otherwise, we should observe to be existing...
assert_eq!(are_missing, (false, false, false));
}
assert_existing_always();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_compact_with_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(true, true);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_with_compact_without_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(true, false);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_compact_with_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(false, true);
}
#[test]
fn test_lowest_cleanup_slot_and_special_cfs_without_compact_without_ledger_cleanup_service_simulation(
) {
do_test_lowest_cleanup_slot_and_special_cfs(false, false);
}
#[test]
fn test_get_rooted_transaction() {
let slot = 2;

View File

@ -32,6 +32,19 @@ impl Blockstore {
}
}
/// Usually this is paired with .purge_slots() but we can't internally call this in
/// that function unconditionally. That's because set_max_expired_slot()
/// expects to purge older slots by the successive chronological order, while .purge_slots()
/// can also be used to purge *future* slots for --hard-fork thing, preserving older
/// slots. It'd be quite dangerous to purge older slots in that case.
/// So, current legal user of this function is LedgerCleanupService.
pub fn set_max_expired_slot(&self, to_slot: Slot) {
// convert here from inclusive purged range end to inclusive alive range start to align
// with Slot::default() for initial compaction filter behavior consistency
let to_slot = to_slot.checked_add(1).unwrap();
self.db.set_oldest_slot(to_slot);
}
pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) {
self.purge_slots(from_slot, to_slot, PurgeType::Exact);
if let Err(e) = self.compact_storage(from_slot, to_slot) {
@ -180,6 +193,13 @@ impl Blockstore {
to_slot,
)?;
}
PurgeType::CompactionFilter => {
// No explicit action is required here because this purge type completely and
// indefinitely relies on the proper working of compaction filter for those
// special column families, never toggling the primary index from the current
// one. Overall, this enables well uniformly distributed writes, resulting
// in no spiky periodic huge delete_range for them.
}
}
delete_range_timer.stop();
let mut write_timer = Measure::start("write_batch");
@ -193,6 +213,10 @@ impl Blockstore {
write_timer.stop();
purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us();
// only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
drop(w_active_transaction_status_index);
Ok(columns_purged)
}
@ -323,18 +347,26 @@ impl Blockstore {
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<()> {
if let Some(index) = self.toggle_transaction_status_index(
if let Some(purged_index) = self.toggle_transaction_status_index(
write_batch,
w_active_transaction_status_index,
to_slot,
)? {
*columns_purged &= self
.db
.delete_range_cf::<cf::TransactionStatus>(write_batch, index, index + 1)
.delete_range_cf::<cf::TransactionStatus>(
write_batch,
purged_index,
purged_index + 1,
)
.is_ok()
& self
.db
.delete_range_cf::<cf::AddressSignatures>(write_batch, index, index + 1)
.delete_range_cf::<cf::AddressSignatures>(
write_batch,
purged_index,
purged_index + 1,
)
.is_ok();
}
Ok(())

View File

@ -5,9 +5,13 @@ use log::*;
use prost::Message;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
self,
compaction_filter::CompactionFilter,
compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, DBIterator, DBRawIterator,
DBRecoveryMode, IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_runtime::hardened_unpack::UnpackError;
@ -17,7 +21,17 @@ use solana_sdk::{
signature::Signature,
};
use solana_storage_proto::convert::generated;
use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc};
use std::{
collections::HashMap,
ffi::{CStr, CString},
fs,
marker::PhantomData,
path::Path,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use thiserror::Error;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
@ -58,6 +72,9 @@ const PERF_SAMPLES_CF: &str = "perf_samples";
/// Column family for BlockHeight
const BLOCK_HEIGHT_CF: &str = "block_height";
// 1 day is chosen for the same reasoning of DEFAULT_COMPACTION_SLOT_INTERVAL
const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;
#[derive(Error, Debug)]
pub enum BlockstoreError {
ShredForIndexExists,
@ -208,8 +225,30 @@ impl From<BlockstoreRecoveryMode> for DBRecoveryMode {
}
}
#[derive(Default, Clone, Debug)]
struct OldestSlot(Arc<AtomicU64>);
impl OldestSlot {
pub fn set(&self, oldest_slot: Slot) {
// this is independently used for compaction_filter without any data dependency.
// also, compaction_filters are created via its factories, creating short-lived copies of
// this atomic value for the single job of compaction. So, Relaxed store can be justified
// in total
self.0.store(oldest_slot, Ordering::Relaxed);
}
pub fn get(&self) -> Slot {
// copy from the AtomicU64 as a general precaution so that the oldest_slot can not mutate
// across single run of compaction for simpler reasoning although this isn't strict
// requirement at the moment
// also eventual propagation (very Relaxed) load is Ok, because compaction by nature doesn't
// require strictly synchronized semantics in this regard
self.0.load(Ordering::Relaxed)
}
}
#[derive(Debug)]
struct Rocks(rocksdb::DB, ActualAccessType);
struct Rocks(rocksdb::DB, ActualAccessType, OldestSlot);
impl Rocks {
fn open(
@ -234,39 +273,73 @@ impl Rocks {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
let oldest_slot = OldestSlot::default();
// Column family names
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type));
let duplicate_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type));
let transaction_status_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type));
let address_signatures_cf_descriptor =
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type));
let transaction_status_index_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type));
let rewards_cf_descriptor =
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type));
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));
let block_height_cf_descriptor =
ColumnFamilyDescriptor::new(BlockHeight::NAME, get_cf_options(&access_type));
let meta_cf_descriptor = ColumnFamilyDescriptor::new(
SlotMeta::NAME,
get_cf_options::<SlotMeta>(&access_type, &oldest_slot),
);
let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(
DeadSlots::NAME,
get_cf_options::<DeadSlots>(&access_type, &oldest_slot),
);
let duplicate_slots_cf_descriptor = ColumnFamilyDescriptor::new(
DuplicateSlots::NAME,
get_cf_options::<DuplicateSlots>(&access_type, &oldest_slot),
);
let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(
ErasureMeta::NAME,
get_cf_options::<ErasureMeta>(&access_type, &oldest_slot),
);
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(
Orphans::NAME,
get_cf_options::<Orphans>(&access_type, &oldest_slot),
);
let root_cf_descriptor = ColumnFamilyDescriptor::new(
Root::NAME,
get_cf_options::<Root>(&access_type, &oldest_slot),
);
let index_cf_descriptor = ColumnFamilyDescriptor::new(
Index::NAME,
get_cf_options::<Index>(&access_type, &oldest_slot),
);
let shred_data_cf_descriptor = ColumnFamilyDescriptor::new(
ShredData::NAME,
get_cf_options::<ShredData>(&access_type, &oldest_slot),
);
let shred_code_cf_descriptor = ColumnFamilyDescriptor::new(
ShredCode::NAME,
get_cf_options::<ShredCode>(&access_type, &oldest_slot),
);
let transaction_status_cf_descriptor = ColumnFamilyDescriptor::new(
TransactionStatus::NAME,
get_cf_options::<TransactionStatus>(&access_type, &oldest_slot),
);
let address_signatures_cf_descriptor = ColumnFamilyDescriptor::new(
AddressSignatures::NAME,
get_cf_options::<AddressSignatures>(&access_type, &oldest_slot),
);
let transaction_status_index_cf_descriptor = ColumnFamilyDescriptor::new(
TransactionStatusIndex::NAME,
get_cf_options::<TransactionStatusIndex>(&access_type, &oldest_slot),
);
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(
Rewards::NAME,
get_cf_options::<Rewards>(&access_type, &oldest_slot),
);
let blocktime_cf_descriptor = ColumnFamilyDescriptor::new(
Blocktime::NAME,
get_cf_options::<Blocktime>(&access_type, &oldest_slot),
);
let perf_samples_cf_descriptor = ColumnFamilyDescriptor::new(
PerfSamples::NAME,
get_cf_options::<PerfSamples>(&access_type, &oldest_slot),
);
let block_height_cf_descriptor = ColumnFamilyDescriptor::new(
BlockHeight::NAME,
get_cf_options::<BlockHeight>(&access_type, &oldest_slot),
);
let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
@ -289,18 +362,18 @@ impl Rocks {
(PerfSamples::NAME, perf_samples_cf_descriptor),
(BlockHeight::NAME, block_height_cf_descriptor),
];
let cf_names: Vec<_> = cfs.iter().map(|c| c.0).collect();
// Open the database
let db = match access_type {
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary,
oldest_slot,
),
AccessType::TryPrimaryThenSecondary => {
let names: Vec<_> = cfs.iter().map(|c| c.0).collect();
match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) {
Ok(db) => Rocks(db, ActualAccessType::Primary),
Ok(db) => Rocks(db, ActualAccessType::Primary, oldest_slot),
Err(err) => {
let secondary_path = path.join("solana-secondary");
@ -312,13 +385,75 @@ impl Rocks {
db_options.set_max_open_files(-1);
Rocks(
DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?,
DB::open_cf_as_secondary(
&db_options,
path,
&secondary_path,
cf_names.clone(),
)?,
ActualAccessType::Secondary,
oldest_slot,
)
}
}
}
};
// this is only needed for LedgerCleanupService. so guard with PrimaryOnly (i.e. running solana-validator)
if matches!(access_type, AccessType::PrimaryOnly) {
for cf_name in cf_names {
// this special column family must be excluded from LedgerCleanupService's rocksdb
// compactions
if cf_name == TransactionStatusIndex::NAME {
continue;
}
// This is the crux of our write-stall-free storage cleaning strategy with consistent
// state view for higher-layers
//
// For the consistent view, we commit delete_range on pruned slot range by LedgerCleanupService.
// simple story here.
//
// For actual storage cleaning, we employ RocksDB compaction. But default RocksDB compaction
// settings don't work well for us. That's because we're using it rather like a really big
// (100 GBs) ring-buffer. RocksDB is basically assuming uniform data write over the key space for
// efficient compaction, which isn't true for our use as a ring buffer.
//
// So, we customize the compaction strategy with 2 combined tweaks:
// (1) compaction_filter and (2) shortening its periodic cycles.
//
// Via the compaction_filter, we finally reclaim previously delete_range()-ed storage occupied
// by pruned slots. When compaction_filter is set, each SST files are re-compacted periodically
// to hunt for keys newly expired by the compaction_filter re-evaluation. But RocksDb's default
// `periodic_compaction_seconds` is 30 days, which is too long for our case. So, we
// shorten it to a day (24 hours).
//
// As we write newer SST files over time at rather consistent rate of speed, this
// effectively makes each newly-created ssts be re-compacted for the filter at
// well-dispersed different timings.
// As a whole, we rewrite the whole dataset at every PERIODIC_COMPACTION_SECONDS,
// slowly over the duration of PERIODIC_COMPACTION_SECONDS. So, this results in
// amortization.
// So, there is a bit inefficiency here because we'll rewrite not-so-old SST files
// too. But longer period would introduce higher variance of ledger storage sizes over
// the long period. And it's much better than the daily IO spike caused by compact_range() by
// previous implementation.
//
// `ttl` and `compact_range`(`ManualCompaction`), doesn't work nicely. That's
// because its original intention is delete_range()s to reclaim disk space. So it tries to merge
// them with N+1 SST files all way down to the bottommost SSTs, often leading to vastly large amount
// (= all) of invalidated SST files, when combined with newer writes happening at the opposite
// edge of the key space. This causes a long and heavy disk IOs and possible write
// stall and ultimately, the deadly Replay/Banking stage stall at higher layers.
db.0.set_options_cf(
db.cf_handle(cf_name),
&[(
"periodic_compaction_seconds",
&format!("{}", PERIODIC_COMPACTION_SECONDS),
)],
)
.unwrap();
}
}
Ok(db)
}
@ -415,9 +550,13 @@ pub trait Column {
fn key(index: Self::Index) -> Vec<u8>;
fn index(key: &[u8]) -> Self::Index;
fn primary_index(index: Self::Index) -> Slot;
// this return Slot or some u64
fn primary_index(index: Self::Index) -> u64;
#[allow(clippy::wrong_self_convention)]
fn as_index(slot: Slot) -> Self::Index;
fn slot(index: Self::Index) -> Slot {
Self::primary_index(index)
}
}
pub trait ColumnName {
@ -491,6 +630,10 @@ impl Column for columns::TransactionStatus {
index.0
}
fn slot(index: Self::Index) -> Slot {
index.2
}
#[allow(clippy::wrong_self_convention)]
fn as_index(index: u64) -> Self::Index {
(index, Signature::default(), 0)
@ -528,6 +671,10 @@ impl Column for columns::AddressSignatures {
index.0
}
fn slot(index: Self::Index) -> Slot {
index.2
}
#[allow(clippy::wrong_self_convention)]
fn as_index(index: u64) -> Self::Index {
(index, Pubkey::default(), 0, Signature::default())
@ -555,6 +702,10 @@ impl Column for columns::TransactionStatusIndex {
index
}
fn slot(_index: Self::Index) -> Slot {
unimplemented!()
}
#[allow(clippy::wrong_self_convention)]
fn as_index(slot: u64) -> u64 {
slot
@ -855,6 +1006,10 @@ impl Database {
pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access()
}
pub fn set_oldest_slot(&self, oldest_slot: Slot) {
self.backend.2.set(oldest_slot);
}
}
impl<C> LedgerColumn<C>
@ -1032,7 +1187,63 @@ impl<'a> WriteBatch<'a> {
}
}
fn get_cf_options(access_type: &AccessType) -> Options {
struct PurgedSlotFilter<C: Column + ColumnName> {
oldest_slot: Slot,
name: CString,
_phantom: PhantomData<C>,
}
impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
fn filter(&mut self, _level: u32, key: &[u8], _value: &[u8]) -> CompactionDecision {
use rocksdb::CompactionDecision::*;
let slot_in_key = C::slot(C::index(key));
// Refer to a comment about periodic_compaction_seconds, especially regarding implicit
// periodic execution of compaction_filters
if slot_in_key >= self.oldest_slot {
Keep
} else {
Remove
}
}
fn name(&self) -> &CStr {
&self.name
}
}
struct PurgedSlotFilterFactory<C: Column + ColumnName> {
oldest_slot: OldestSlot,
name: CString,
_phantom: PhantomData<C>,
}
impl<C: Column + ColumnName> CompactionFilterFactory for PurgedSlotFilterFactory<C> {
type Filter = PurgedSlotFilter<C>;
fn create(&mut self, _context: CompactionFilterContext) -> Self::Filter {
let copied_oldest_slot = self.oldest_slot.get();
PurgedSlotFilter::<C> {
oldest_slot: copied_oldest_slot,
name: CString::new(format!(
"purged_slot_filter({}, {:?})",
C::NAME,
copied_oldest_slot
))
.unwrap(),
_phantom: PhantomData::default(),
}
}
fn name(&self) -> &CStr {
&self.name
}
}
fn get_cf_options<C: 'static + Column + ColumnName>(
access_type: &AccessType,
oldest_slot: &OldestSlot,
) -> Options {
let mut options = Options::default();
// 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
options.set_max_write_buffer_number(8);
@ -1046,6 +1257,19 @@ fn get_cf_options(access_type: &AccessType) -> Options {
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base);
// TransactionStatusIndex must be excluded from LedgerCleanupService's rocksdb
// compactions....
if matches!(access_type, AccessType::PrimaryOnly)
&& C::NAME != columns::TransactionStatusIndex::NAME
{
options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
oldest_slot: oldest_slot.clone(),
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
_phantom: PhantomData::default(),
});
}
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}
@ -1077,3 +1301,57 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::blockstore_db::columns::ShredData;
#[test]
fn test_compaction_filter() {
// this doesn't implement Clone...
let dummy_compaction_filter_context = || CompactionFilterContext {
is_full_compaction: true,
is_manual_compaction: true,
};
let oldest_slot = OldestSlot::default();
let mut factory = PurgedSlotFilterFactory::<ShredData> {
oldest_slot: oldest_slot.clone(),
name: CString::new("test compaction filter").unwrap(),
_phantom: PhantomData::default(),
};
let mut compaction_filter = factory.create(dummy_compaction_filter_context());
let dummy_level = 0;
let key = ShredData::key(ShredData::as_index(0));
let dummy_value = vec![];
// we can't use assert_matches! because CompactionDecision doesn't implement Debug
assert!(matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Keep
));
// mutating oledst_slot doen't affect existing compaction filters...
oldest_slot.set(1);
assert!(matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Keep
));
// recreating compaction filter starts to expire the key
let mut compaction_filter = factory.create(dummy_compaction_filter_context());
assert!(matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Remove
));
// newer key shouldn't be removed
let key = ShredData::key(ShredData::as_index(1));
matches!(
compaction_filter.filter(dummy_level, &key, &dummy_value),
CompactionDecision::Keep
);
}
}

View File

@ -1500,7 +1500,7 @@ pub fn main() {
Arg::with_name("no_rocksdb_compaction")
.long("no-rocksdb-compaction")
.takes_value(false)
.help("Disable manual compaction of the ledger database. May increase storage requirements.")
.help("Disable manual compaction of the ledger database (this is ignored).")
)
.arg(
Arg::with_name("rocksdb_compaction_interval")
@ -2016,7 +2016,7 @@ pub fn main() {
let private_rpc = matches.is_present("private_rpc");
let no_port_check = matches.is_present("no_port_check");
let no_rocksdb_compaction = matches.is_present("no_rocksdb_compaction");
let no_rocksdb_compaction = true;
let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok();
let rocksdb_max_compaction_jitter =
value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok();