pass RestartableBucket through disk index (#33377)
This commit is contained in:
parent
fcddeb446b
commit
456563b9e9
|
@ -1,7 +1,7 @@
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
bucket::Bucket, bucket_item::BucketItem, bucket_map::BucketMapError,
|
bucket::Bucket, bucket_item::BucketItem, bucket_map::BucketMapError,
|
||||||
bucket_stats::BucketMapStats, MaxSearch, RefCount,
|
bucket_stats::BucketMapStats, restart::RestartableBucket, MaxSearch, RefCount,
|
||||||
},
|
},
|
||||||
solana_sdk::pubkey::Pubkey,
|
solana_sdk::pubkey::Pubkey,
|
||||||
std::{
|
std::{
|
||||||
|
@ -23,6 +23,11 @@ pub struct BucketApi<T: Clone + Copy + PartialEq + 'static> {
|
||||||
|
|
||||||
bucket: LockedBucket<T>,
|
bucket: LockedBucket<T>,
|
||||||
count: Arc<AtomicU64>,
|
count: Arc<AtomicU64>,
|
||||||
|
|
||||||
|
/// keeps track of which index file this bucket is currently using
|
||||||
|
/// or at startup, which bucket file this bucket should initially use
|
||||||
|
#[allow(dead_code)]
|
||||||
|
restartable_bucket: RestartableBucket,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
|
impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
|
||||||
|
@ -30,6 +35,7 @@ impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
|
||||||
drives: Arc<Vec<PathBuf>>,
|
drives: Arc<Vec<PathBuf>>,
|
||||||
max_search: MaxSearch,
|
max_search: MaxSearch,
|
||||||
stats: Arc<BucketMapStats>,
|
stats: Arc<BucketMapStats>,
|
||||||
|
restartable_bucket: RestartableBucket,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
drives,
|
drives,
|
||||||
|
@ -37,6 +43,7 @@ impl<T: Clone + Copy + PartialEq + std::fmt::Debug> BucketApi<T> {
|
||||||
stats,
|
stats,
|
||||||
bucket: RwLock::default(),
|
bucket: RwLock::default(),
|
||||||
count: Arc::default(),
|
count: Arc::default(),
|
||||||
|
restartable_bucket,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,17 @@
|
||||||
//! BucketMap is a mostly contention free concurrent map backed by MmapMut
|
//! BucketMap is a mostly contention free concurrent map backed by MmapMut
|
||||||
|
|
||||||
use {
|
use {
|
||||||
crate::{bucket_api::BucketApi, bucket_stats::BucketMapStats, MaxSearch, RefCount},
|
crate::{
|
||||||
|
bucket_api::BucketApi, bucket_stats::BucketMapStats, restart::Restart, MaxSearch, RefCount,
|
||||||
|
},
|
||||||
solana_sdk::pubkey::Pubkey,
|
solana_sdk::pubkey::Pubkey,
|
||||||
std::{convert::TryInto, fmt::Debug, fs, path::PathBuf, sync::Arc},
|
std::{
|
||||||
|
convert::TryInto,
|
||||||
|
fmt::Debug,
|
||||||
|
fs::{self},
|
||||||
|
path::PathBuf,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
},
|
||||||
tempfile::TempDir,
|
tempfile::TempDir,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -83,6 +91,11 @@ impl<T: Clone + Copy + Debug + PartialEq> BucketMap<T> {
|
||||||
if let Some(drives) = config.drives.as_ref() {
|
if let Some(drives) = config.drives.as_ref() {
|
||||||
Self::erase_previous_drives(drives);
|
Self::erase_previous_drives(drives);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let stats = Arc::default();
|
||||||
|
|
||||||
|
let restart = Restart::new(&config);
|
||||||
|
|
||||||
let mut temp_dir = None;
|
let mut temp_dir = None;
|
||||||
let drives = config.drives.unwrap_or_else(|| {
|
let drives = config.drives.unwrap_or_else(|| {
|
||||||
temp_dir = Some(TempDir::new().unwrap());
|
temp_dir = Some(TempDir::new().unwrap());
|
||||||
|
@ -90,13 +103,19 @@ impl<T: Clone + Copy + Debug + PartialEq> BucketMap<T> {
|
||||||
});
|
});
|
||||||
let drives = Arc::new(drives);
|
let drives = Arc::new(drives);
|
||||||
|
|
||||||
let stats = Arc::default();
|
let restart = restart.map(|restart| Arc::new(Mutex::new(restart)));
|
||||||
let buckets = (0..config.max_buckets)
|
|
||||||
.map(|_| {
|
let restartable_buckets =
|
||||||
|
Restart::get_restartable_buckets(restart.as_ref(), &drives, config.max_buckets);
|
||||||
|
|
||||||
|
let buckets = restartable_buckets
|
||||||
|
.into_iter()
|
||||||
|
.map(|restartable_bucket| {
|
||||||
Arc::new(BucketApi::new(
|
Arc::new(BucketApi::new(
|
||||||
Arc::clone(&drives),
|
Arc::clone(&drives),
|
||||||
max_search,
|
max_search,
|
||||||
Arc::clone(&stats),
|
Arc::clone(&stats),
|
||||||
|
restartable_bucket,
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
Loading…
Reference in New Issue