refactor SharedBuffer to separate Arc refcount shutdown issues (#18563)
This commit is contained in:
parent
5cea25ac3e
commit
3e11468a04
|
@ -29,11 +29,9 @@ const CHUNK_SIZE_DEFAULT: usize = 100_000_000;
|
|||
type OneSharedBuffer = Arc<Vec<u8>>;
|
||||
|
||||
struct SharedBufferInternal {
|
||||
// error encountered during read
|
||||
error: RwLock<std::io::Result<usize>>,
|
||||
bg_reader: Mutex<Option<JoinHandle<()>>>,
|
||||
bg_eof_reached: AtomicBool,
|
||||
stop: AtomicBool,
|
||||
bg_reader_data: Arc<SharedBufferBgReader>,
|
||||
|
||||
bg_reader_join_handle: Mutex<Option<JoinHandle<()>>>,
|
||||
|
||||
// Keep track of the next read location per outstanding client.
|
||||
// index is client's my_client_index.
|
||||
|
@ -41,21 +39,10 @@ struct SharedBufferInternal {
|
|||
// Any buffer at index < min(clients) can be recycled or destroyed.
|
||||
clients: RwLock<Vec<usize>>,
|
||||
|
||||
// bg thread reads to 'newly_read_data' and signals
|
||||
newly_read_data: RwLock<Vec<OneSharedBuffer>>,
|
||||
// set when newly_read_data gets new data written to it and can be transferred
|
||||
newly_read_data_signal: WaitableCondvar,
|
||||
// unpacking callers read from 'data'. newly_read_data is transferred to 'data when 'data' is exhausted.
|
||||
// This minimizes lock contention since bg file reader has to have almost constant write access.
|
||||
data: RwLock<Vec<OneSharedBuffer>>,
|
||||
|
||||
// currently available set of buffers for bg to read into
|
||||
// during operation, this is exhausted as the bg reads ahead
|
||||
// As all clients are done with an earlier buffer, it is recycled by being put back into this vec for the bg thread to pull out.
|
||||
buffers: RwLock<Vec<OneSharedBuffer>>,
|
||||
// signaled when a new buffer is added to buffers. This throttles the bg reading.
|
||||
new_buffer_signal: WaitableCondvar,
|
||||
|
||||
// it is convenient to have one of these around
|
||||
empty_buffer: OneSharedBuffer,
|
||||
}
|
||||
|
@ -74,44 +61,26 @@ impl SharedBuffer {
|
|||
reader: T,
|
||||
) -> Self {
|
||||
let instance = SharedBufferInternal {
|
||||
buffers: RwLock::new(Self::alloc_buffers(total_buffer_budget, chunk_size)),
|
||||
bg_reader_data: Arc::new(SharedBufferBgReader::new(total_buffer_budget, chunk_size)),
|
||||
data: RwLock::new(vec![OneSharedBuffer::default()]), // initialize with 1 vector of empty data at data[0]
|
||||
error: RwLock::new(Ok(0)),
|
||||
|
||||
// default values
|
||||
newly_read_data: RwLock::default(),
|
||||
bg_reader: Mutex::default(),
|
||||
bg_eof_reached: AtomicBool::default(),
|
||||
stop: AtomicBool::default(),
|
||||
newly_read_data_signal: WaitableCondvar::default(),
|
||||
new_buffer_signal: WaitableCondvar::default(),
|
||||
bg_reader_join_handle: Mutex::default(),
|
||||
clients: RwLock::default(),
|
||||
empty_buffer: OneSharedBuffer::default(),
|
||||
};
|
||||
let instance = Arc::new(instance);
|
||||
let instance_ = instance.clone();
|
||||
let bg_reader_data = instance.bg_reader_data.clone();
|
||||
|
||||
let handle = Builder::new()
|
||||
.name("solana-compressed_file_reader".to_string())
|
||||
.spawn(move || {
|
||||
instance_.read_entire_file_in_bg(reader);
|
||||
// importantly, this thread does NOT hold a refcount on the arc of 'instance'
|
||||
bg_reader_data.read_entire_file_in_bg(reader);
|
||||
});
|
||||
*instance.bg_reader.lock().unwrap() = Some(handle.unwrap());
|
||||
*instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap());
|
||||
Self { instance }
|
||||
}
|
||||
fn alloc_buffers(total_buffer_budget: usize, chunk_size: usize) -> Vec<OneSharedBuffer> {
|
||||
assert!(total_buffer_budget > 0);
|
||||
assert!(chunk_size > 0);
|
||||
let buffers = Self::num_buffers(total_buffer_budget, chunk_size);
|
||||
let initial_vector_count = buffers;
|
||||
(0..initial_vector_count)
|
||||
.into_iter()
|
||||
.map(|_| Arc::new(vec![0u8; chunk_size]))
|
||||
.collect()
|
||||
}
|
||||
fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize {
|
||||
std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SharedBufferReader {
|
||||
|
@ -131,8 +100,8 @@ pub struct SharedBufferReader {
|
|||
|
||||
impl Drop for SharedBufferInternal {
|
||||
fn drop(&mut self) {
|
||||
if let Some(handle) = self.bg_reader.lock().unwrap().take() {
|
||||
self.stop.store(true, Ordering::Relaxed);
|
||||
if let Some(handle) = self.bg_reader_join_handle.lock().unwrap().take() {
|
||||
self.bg_reader_data.stop.store(true, Ordering::Relaxed);
|
||||
handle.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -144,7 +113,67 @@ impl Drop for SharedBufferReader {
|
|||
}
|
||||
}
|
||||
|
||||
impl SharedBufferInternal {
|
||||
#[derive(Debug)]
|
||||
struct SharedBufferBgReader {
|
||||
stop: AtomicBool,
|
||||
// error encountered during read
|
||||
error: RwLock<std::io::Result<usize>>,
|
||||
// bg thread reads to 'newly_read_data' and signals
|
||||
newly_read_data: RwLock<Vec<OneSharedBuffer>>,
|
||||
// set when newly_read_data gets new data written to it and can be transferred
|
||||
newly_read_data_signal: WaitableCondvar,
|
||||
|
||||
// currently available set of buffers for bg to read into
|
||||
// during operation, this is exhausted as the bg reads ahead
|
||||
// As all clients are done with an earlier buffer, it is recycled by being put back into this vec for the bg thread to pull out.
|
||||
buffers: RwLock<Vec<OneSharedBuffer>>,
|
||||
// signaled when a new buffer is added to buffers. This throttles the bg reading.
|
||||
new_buffer_signal: WaitableCondvar,
|
||||
|
||||
bg_eof_reached: AtomicBool,
|
||||
}
|
||||
|
||||
impl SharedBufferBgReader {
|
||||
fn new(total_buffer_budget: usize, chunk_size: usize) -> Self {
|
||||
SharedBufferBgReader {
|
||||
buffers: RwLock::new(Self::alloc_buffers(total_buffer_budget, chunk_size)),
|
||||
error: RwLock::new(Ok(0)),
|
||||
|
||||
// easy defaults
|
||||
stop: AtomicBool::new(false),
|
||||
newly_read_data: RwLock::default(),
|
||||
newly_read_data_signal: WaitableCondvar::default(),
|
||||
new_buffer_signal: WaitableCondvar::default(),
|
||||
bg_eof_reached: AtomicBool::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn default_wait_timeout() -> Duration {
|
||||
Duration::from_millis(100) // short enough to be unnoticable in case of trouble, long enough for efficient waiting
|
||||
}
|
||||
fn wait_for_new_buffer(&self) -> bool {
|
||||
self.new_buffer_signal
|
||||
.wait_timeout(Self::default_wait_timeout())
|
||||
}
|
||||
|
||||
fn alloc_buffers(total_buffer_budget: usize, chunk_size: usize) -> Vec<OneSharedBuffer> {
|
||||
assert!(total_buffer_budget > 0);
|
||||
assert!(chunk_size > 0);
|
||||
let buffers = Self::num_buffers(total_buffer_budget, chunk_size);
|
||||
let initial_vector_count = buffers;
|
||||
(0..initial_vector_count)
|
||||
.into_iter()
|
||||
.map(|_| Arc::new(vec![0u8; chunk_size]))
|
||||
.collect()
|
||||
}
|
||||
fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize {
|
||||
std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer
|
||||
}
|
||||
fn set_error(&self, error: std::io::Error) {
|
||||
*self.error.write().unwrap() = Err(error);
|
||||
self.newly_read_data_signal.notify_all(); // any client waiting for new data needs to wake up and check for errors
|
||||
}
|
||||
|
||||
// read ahead the entire file.
|
||||
// This is governed by the supply of buffers.
|
||||
// Buffers are likely limited to cap memory usage.
|
||||
|
@ -244,27 +273,20 @@ impl SharedBufferInternal {
|
|||
self.error.read().unwrap()
|
||||
);
|
||||
}
|
||||
fn set_error(&self, error: std::io::Error) {
|
||||
*self.error.write().unwrap() = Err(error);
|
||||
self.newly_read_data_signal.notify_all(); // any client waiting for new data needs to wake up and check for errors
|
||||
}
|
||||
fn default_wait_timeout() -> Duration {
|
||||
Duration::from_millis(100) // short enough to be unnoticable in case of trouble, long enough for efficient waiting
|
||||
}
|
||||
fn wait_for_new_buffer(&self) -> bool {
|
||||
self.new_buffer_signal
|
||||
.wait_timeout(Self::default_wait_timeout())
|
||||
}
|
||||
|
||||
impl SharedBufferInternal {
|
||||
fn wait_for_newly_read_data(&self) -> bool {
|
||||
self.newly_read_data_signal
|
||||
.wait_timeout(Self::default_wait_timeout())
|
||||
self.bg_reader_data
|
||||
.newly_read_data_signal
|
||||
.wait_timeout(SharedBufferBgReader::default_wait_timeout())
|
||||
}
|
||||
// bg reader uses write lock on 'newly_read_data' each time a buffer is read or recycled
|
||||
// client readers read from 'data' using read locks
|
||||
// when all of 'data' has been exhausted by clients, 1 client needs to transfer from 'newly_read_data' to 'data' one time.
|
||||
// returns true if any data was added to 'data'
|
||||
fn transfer_data_from_bg(&self) -> bool {
|
||||
let mut from_lock = self.newly_read_data.write().unwrap();
|
||||
let mut from_lock = self.bg_reader_data.newly_read_data.write().unwrap();
|
||||
if from_lock.is_empty() {
|
||||
// no data available from bg
|
||||
return false;
|
||||
|
@ -280,7 +302,7 @@ impl SharedBufferInternal {
|
|||
true // data was transferred
|
||||
}
|
||||
fn has_reached_eof(&self) -> bool {
|
||||
self.bg_eof_reached.load(Ordering::Relaxed)
|
||||
self.bg_reader_data.bg_eof_reached.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -351,8 +373,14 @@ impl SharedBufferReader {
|
|||
if !eof {
|
||||
// if !eof, recycle this buffer and notify waiting reader(s)
|
||||
// if eof, just drop buffer this buffer since it isn't needed for reading anymore
|
||||
self.instance.buffers.write().unwrap().push(remove);
|
||||
self.instance.new_buffer_signal.notify_all(); // new buffer available for bg reader
|
||||
self.instance
|
||||
.bg_reader_data
|
||||
.buffers
|
||||
.write()
|
||||
.unwrap()
|
||||
.push(remove);
|
||||
self.instance.bg_reader_data.new_buffer_signal.notify_all();
|
||||
// new buffer available for bg reader
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -427,7 +455,7 @@ impl Read for SharedBufferReader {
|
|||
// Since the bg reader could not satisfy our read, now is a good time to check to see if the bg reader encountered an error.
|
||||
// Note this is a write lock because we want to get the actual error detected and return it here and avoid races with other readers if we tried a read and then subsequent write lock.
|
||||
// This would be simpler if I could clone an io error.
|
||||
let mut error = instance.error.write().unwrap();
|
||||
let mut error = instance.bg_reader_data.error.write().unwrap();
|
||||
if error.is_err() {
|
||||
// replace the current error (with AN error instead of ok)
|
||||
let mut stored_error = Err(Self::default_error());
|
||||
|
@ -738,7 +766,7 @@ pub mod tests {
|
|||
}
|
||||
|
||||
fn adjusted_buffer_size(total_buffer_budget: usize, chunk_size: usize) -> usize {
|
||||
let num_buffers = SharedBuffer::num_buffers(total_buffer_budget, chunk_size);
|
||||
let num_buffers = SharedBufferBgReader::num_buffers(total_buffer_budget, chunk_size);
|
||||
num_buffers * chunk_size
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue