Spawn for each stream (#26086)

This commit is contained in:
sakridge 2022-06-27 12:03:40 -07:00 committed by GitHub
parent 72dc813ada
commit 2cc48a650b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 31 additions and 25 deletions

View File

@ -237,33 +237,39 @@ async fn handle_connection(
Ok(mut stream) => {
stats.total_streams.fetch_add(1, Ordering::Relaxed);
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
let stream_exit = stream_exit.clone();
let stats = stats.clone();
let packet_sender = packet_sender.clone();
let last_update = last_update.clone();
tokio::spawn(async move {
let mut maybe_batch = None;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(chunk) = tokio::time::timeout(
Duration::from_millis(WAIT_FOR_STREAM_TIMEOUT_MS),
stream.read_chunk(PACKET_DATA_SIZE, false),
)
.await
{
if handle_chunk(
&chunk,
&mut maybe_batch,
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
}
} else {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
}
} else {
debug!("Timeout in receiving on stream");
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
});
}
Err(e) => {
debug!("stream error: {:?}", e);