Don't use pinned memory when unnecessary (#17832)

Reports of excessive GPU memory usage and errors
from cudaHostRegister. There are some cases where pinning is
not required.
This commit is contained in:
sakridge 2021-06-14 16:10:04 +02:00 committed by GitHub
parent d4cc975fe9
commit eeee75c5be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 47 additions and 7 deletions

View File

@ -92,6 +92,7 @@ fn main() -> Result<()> {
recycler.clone(),
"bench-streamer-test",
1,
true,
));
}

View File

@ -113,6 +113,7 @@ impl FetchStage {
recycler.clone(),
"fetch_stage",
coalesce_ms,
true,
)
});
@ -125,6 +126,7 @@ impl FetchStage {
recycler.clone(),
"fetch_forward_stage",
coalesce_ms,
true,
)
});

View File

@ -515,7 +515,7 @@ impl ServeRepair {
if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
@ -555,7 +555,7 @@ impl ServeRepair {
from_addr,
nonce,
)?;
return Some(Packets::new_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
@ -572,7 +572,7 @@ impl ServeRepair {
max_responses: usize,
nonce: Nonce,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {

View File

@ -33,6 +33,7 @@ impl ServeRepairService {
Recycler::default(),
"serve_repair_receiver",
1,
false,
);
let (response_sender, response_receiver) = channel();
let t_responder =

View File

@ -150,6 +150,7 @@ impl ShredFetchStage {
recycler.clone(),
"packet_modifier",
1,
true,
)
})
.collect();

View File

@ -2051,7 +2051,8 @@ impl ClusterInfo {
.process_pull_requests(callers.cloned(), timestamp());
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let mut packets =
Packets::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng();
let check_pull_request =
@ -2323,7 +2324,7 @@ impl ClusterInfo {
None
} else {
let packets =
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
Packets::new_unpinned_with_recycler_data(recycler, "handle_ping_messages", packets);
Some(packets)
}
}

View File

@ -54,6 +54,7 @@ impl GossipService {
Recycler::default(),
"gossip_receiver",
1,
false,
);
let (response_sender, response_receiver) = channel();
let (consume_sender, listen_receiver) = channel();

View File

@ -146,6 +146,10 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a
}
impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn reserve(&mut self, size: usize) {
self.x.reserve(size);
}
pub fn reserve_and_pin(&mut self, size: usize) {
if self.x.capacity() < size {
if self.pinned {

View File

@ -28,11 +28,22 @@ impl Packets {
Packets { packets }
}
pub fn new_unpinned_with_recycler(
recycler: PacketsRecycler,
size: usize,
name: &'static str,
) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve(size);
Packets { packets }
}
pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve_and_pin(size);
Packets { packets }
}
pub fn new_with_recycler_data(
recycler: &PacketsRecycler,
name: &'static str,
@ -43,6 +54,16 @@ impl Packets {
vec
}
pub fn new_unpinned_with_recycler_data(
recycler: &PacketsRecycler,
name: &'static str,
mut packets: Vec<Packet>,
) -> Self {
let mut vec = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name);
vec.packets.append(&mut packets);
vec
}
pub fn set_addr(&mut self, addr: &SocketAddr) {
for m in self.packets.iter_mut() {
m.meta.set_addr(&addr);
@ -76,7 +97,7 @@ pub fn to_packets_with_destination<T: Serialize>(
recycler: PacketsRecycler,
dests_and_data: &[(SocketAddr, T)],
) -> Packets {
let mut out = Packets::new_with_recycler(
let mut out = Packets::new_unpinned_with_recycler(
recycler,
dests_and_data.len(),
"to_packets_with_destination",

View File

@ -36,13 +36,18 @@ fn recv_loop(
recycler: &PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> Result<()> {
let mut recv_count = 0;
let mut call_count = 0;
let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received
loop {
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
let mut msgs = if use_pinned_memory {
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
} else {
Packets::with_capacity(PACKETS_PER_BATCH)
};
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader transaction socket)
@ -84,6 +89,7 @@ pub fn receiver(
recycler: PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
@ -100,6 +106,7 @@ pub fn receiver(
&recycler.clone(),
name,
coalesce_ms,
use_pinned_memory,
);
})
.unwrap()
@ -211,6 +218,7 @@ mod test {
Recycler::default(),
"test",
1,
true,
);
let t_responder = {
let (s_responder, r_responder) = channel();