From 090fbeca2481927a0543d22750ae32de59fd45bd Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" <75863576+jeffwashington@users.noreply.github.com> Date: Fri, 16 Jul 2021 13:17:03 -0500 Subject: [PATCH] lazy allocate buffers for bg reader in untar (#18640) --- runtime/src/shared_buffer_reader.rs | 40 +++++++++++++++-------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/runtime/src/shared_buffer_reader.rs b/runtime/src/shared_buffer_reader.rs index 5bf7083661..7c8925caae 100644 --- a/runtime/src/shared_buffer_reader.rs +++ b/runtime/src/shared_buffer_reader.rs @@ -60,8 +60,10 @@ impl SharedBuffer { chunk_size: usize, reader: T, ) -> Self { + assert!(total_buffer_budget > 0); + assert!(chunk_size > 0); let instance = SharedBufferInternal { - bg_reader_data: Arc::new(SharedBufferBgReader::new(total_buffer_budget, chunk_size)), + bg_reader_data: Arc::new(SharedBufferBgReader::new()), data: RwLock::new(vec![OneSharedBuffer::default()]), // initialize with 1 vector of empty data at data[0] // default values @@ -76,7 +78,7 @@ impl SharedBuffer { .name("solana-compressed_file_reader".to_string()) .spawn(move || { // importantly, this thread does NOT hold a refcount on the arc of 'instance' - bg_reader_data.read_entire_file_in_bg(reader); + bg_reader_data.read_entire_file_in_bg(reader, total_buffer_budget, chunk_size); }); *instance.bg_reader_join_handle.lock().unwrap() = Some(handle.unwrap()); Self { instance } @@ -134,9 +136,9 @@ struct SharedBufferBgReader { } impl SharedBufferBgReader { - fn new(total_buffer_budget: usize, chunk_size: usize) -> Self { + fn new() -> Self { SharedBufferBgReader { - buffers: RwLock::new(Self::alloc_buffers(total_buffer_budget, chunk_size)), + buffers: RwLock::new(vec![]), error: RwLock::new(Ok(0)), // easy defaults @@ -155,17 +157,6 @@ impl SharedBufferBgReader { self.new_buffer_signal .wait_timeout(Self::default_wait_timeout()) } - - fn alloc_buffers(total_buffer_budget: usize, chunk_size: usize) -> Vec { - assert!(total_buffer_budget > 0); - assert!(chunk_size > 0); - let buffers = Self::num_buffers(total_buffer_budget, chunk_size); - let initial_vector_count = buffers; - (0..initial_vector_count) - .into_iter() - .map(|_| Arc::new(vec![0u8; chunk_size])) - .collect() - } fn num_buffers(total_buffer_budget: usize, chunk_size: usize) -> usize { std::cmp::max(1, total_buffer_budget / chunk_size) // at least 1 buffer } @@ -179,7 +170,12 @@ impl SharedBufferBgReader { // Buffers are likely limited to cap memory usage. // A buffer is recycled after the last client finishes reading from it. // When a buffer is available (initially or recycled), this code wakes up and reads into that buffer. - fn read_entire_file_in_bg(&self, mut reader: T) { + fn read_entire_file_in_bg( + &self, + mut reader: T, + total_buffer_budget: usize, + chunk_size: usize, + ) { let now = std::time::Instant::now(); let mut read_us = 0; @@ -187,6 +183,7 @@ impl SharedBufferBgReader { let mut wait_us = 0; let mut total_bytes = 0; let mut error = SharedBufferReader::default_error(); + let mut remaining_buffers_to_allocate = Self::num_buffers(total_buffer_budget, chunk_size); loop { if self.stop.load(Ordering::Relaxed) { // unsure what error is most appropriate here. @@ -197,11 +194,15 @@ impl SharedBufferBgReader { let mut buffers = self.buffers.write().unwrap(); let buffer = buffers.pop(); drop(buffers); - let (dest_size, mut dest_data) = if let Some(dest_data) = buffer { + let mut dest_data = if let Some(dest_data) = buffer { // assert that this should not result in a vector copy // These are internal buffers and should not be held by anyone else. assert_eq!(Arc::strong_count(&dest_data), 1); - (dest_data.len(), dest_data) + dest_data + } else if remaining_buffers_to_allocate > 0 { + // we still haven't allocated all the buffers we are allowed to allocate + remaining_buffers_to_allocate -= 1; + Arc::new(vec![0; chunk_size]) } else { // nowhere to write, so wait for a buffer to become available let mut wait_for_new_buffer = Measure::start("wait_for_new_buffer"); @@ -210,11 +211,12 @@ impl SharedBufferBgReader { wait_us += wait_for_new_buffer.as_us(); continue; // check stop, try to get a buffer again }; + let target = Arc::make_mut(&mut dest_data); + let dest_size = target.len(); let mut bytes_read = 0; let mut eof = false; let mut error_received = false; - let target = Arc::make_mut(&mut dest_data); while bytes_read < dest_size { let mut time_read = Measure::start("read");