Fixing the performance issues related to stream management

This commit is contained in:
godmodegalactus 2024-06-05 15:38:23 +02:00
parent 828cfe38a3
commit e304c607fd
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
4 changed files with 39 additions and 11 deletions

View File

@ -233,7 +233,10 @@ pub fn create_quiche_client_thread(
} }
for stream_id in connection.writable() { for stream_id in connection.writable() {
handle_writable(&mut connection, &mut partial_responses, stream_id); if let Err(e) = handle_writable(&mut connection, &mut partial_responses, stream_id)
{
log::error!("Error writing message on writable stream : {e:?}");
}
} }
if connection.is_closed() { if connection.is_closed() {
@ -375,7 +378,6 @@ mod tests {
rx_sent_queue, rx_sent_queue,
CompressionType::Lz4Fast(8), CompressionType::Lz4Fast(8),
true, true,
100,
) { ) {
log::error!("Server loop closed by error : {e}"); log::error!("Server loop closed by error : {e}");
} }

View File

@ -133,7 +133,12 @@ pub async fn main() {
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
println!("Subscribing"); println!("Subscribing");
client client
.subscribe(vec![Filter::BlockAll, Filter::Slot, Filter::BlockMeta]) .subscribe(vec![
Filter::BlockAll,
Filter::Slot,
Filter::BlockMeta,
Filter::AccountsAll,
])
.await .await
.unwrap(); .unwrap();
println!("Subscribed"); println!("Subscribed");

View File

@ -15,6 +15,7 @@ pub fn send_message(
) -> std::result::Result<(), quiche::Error> { ) -> std::result::Result<(), quiche::Error> {
let written = match connection.stream_send(stream_id, message, true) { let written = match connection.stream_send(stream_id, message, true) {
Ok(v) => v, Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => { Err(e) => {
return Err(e); return Err(e);
} }
@ -36,19 +37,25 @@ pub fn handle_writable(
conn: &mut quiche::Connection, conn: &mut quiche::Connection,
partial_responses: &mut PartialResponses, partial_responses: &mut PartialResponses,
stream_id: u64, stream_id: u64,
) { ) -> std::result::Result<(), quiche::Error> {
log::trace!("{} stream {} is writable", conn.trace_id(), stream_id); log::trace!("{} stream {} is writable", conn.trace_id(), stream_id);
let resp = match partial_responses.get_mut(&stream_id) { let resp = match partial_responses.get_mut(&stream_id) {
Some(s) => s, Some(s) => s,
None => { None => {
return; // stream has finished
let _ = conn.stream_send(stream_id, b"", true);
return Ok(());
} }
}; };
let body = &resp.binary; let body = &resp.binary;
let written = match conn.stream_send(stream_id, body, true) { let written = match conn.stream_send(stream_id, body, true) {
Ok(v) => v, Ok(v) => v,
Err(quiche::Error::Done) => {
// done writing
return Ok(());
}
Err(e) => { Err(e) => {
partial_responses.remove(&stream_id); partial_responses.remove(&stream_id);
@ -56,12 +63,12 @@ pub fn handle_writable(
"{} stream id :{stream_id} send failed {e:?}", "{} stream id :{stream_id} send failed {e:?}",
conn.trace_id() conn.trace_id()
); );
return; return Err(e);
} }
}; };
if written == 0 { if written == 0 {
return; return Ok(());
} }
if written == resp.binary.len() { if written == resp.binary.len() {
@ -78,4 +85,5 @@ pub fn handle_writable(
resp.binary = resp.binary[written..].to_vec(); resp.binary = resp.binary[written..].to_vec();
resp.written += written; resp.written += written;
} }
Ok(())
} }

View File

@ -412,7 +412,11 @@ fn create_client_task(
{ {
for stream_id in connection.writable() { for stream_id in connection.writable() {
number_of_writable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed); number_of_writable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
handle_writable(&mut connection, &mut partial_responses, stream_id); if let Err(e) =
handle_writable(&mut connection, &mut partial_responses, stream_id)
{
log::error!("Error writing {e:?}");
}
} }
while partial_responses.len() < max_allowed_partial_responses { while partial_responses.len() < max_allowed_partial_responses {
@ -433,13 +437,22 @@ fn create_client_task(
true true
} else { } else {
messages_added.fetch_add(1, std::sync::atomic::Ordering::Relaxed); messages_added.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
send_message( match send_message(
&mut connection, &mut connection,
&mut partial_responses, &mut partial_responses,
stream_id, stream_id,
&message, &message,
) ) {
.is_err() Ok(_) => false,
Err(quiche::Error::Done) => {
// done writing / queue is full
break;
}
Err(e) => {
log::error!("error sending message : {e:?}");
true
}
}
} }
} }
Err(e) => { Err(e) => {