improving performance of quiche server
This commit is contained in:
parent
1d4473b102
commit
4b9a9ba152
|
@ -59,11 +59,9 @@ pub fn handle_writable(
|
|||
.expect("should have a stream id");
|
||||
let body = &resp.binary;
|
||||
|
||||
let written = match conn.stream_send(stream_id, body, true) {
|
||||
let written = match conn.stream_send(stream_id, body, false) {
|
||||
Ok(v) => v,
|
||||
|
||||
Err(quiche::Error::Done) => 0,
|
||||
|
||||
Err(e) => {
|
||||
partial_responses.remove(&stream_id);
|
||||
|
||||
|
@ -73,6 +71,13 @@ pub fn handle_writable(
|
|||
};
|
||||
if resp.written == resp.binary.len() {
|
||||
partial_responses.remove(&stream_id);
|
||||
match conn.stream_send(stream_id, &[], true) {
|
||||
Ok(_) => {}
|
||||
Err(quiche::Error::Done) => {}
|
||||
Err(e) => {
|
||||
log::error!("{} fin stream failed {:?}", conn.trace_id(), e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
resp.binary = resp.binary[written..].to_vec();
|
||||
resp.written += written;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{collections::HashMap, net::SocketAddr};
|
||||
|
||||
use itertools::Itertools;
|
||||
use mio::net::UdpSocket;
|
||||
use mio::{net::UdpSocket, Token};
|
||||
use quiche::{Connection, ConnectionId};
|
||||
use ring::rand::SystemRandom;
|
||||
|
||||
|
@ -31,6 +31,10 @@ struct Client {
|
|||
pub next_stream: u64,
|
||||
}
|
||||
|
||||
const NETWORK_READ: Token = Token(0);
|
||||
const NETWORK_WRITE: Token = Token(1);
|
||||
const CHANNEL_READ: Token = Token(2);
|
||||
|
||||
type ClientMap = HashMap<quiche::ConnectionId<'static>, Client>;
|
||||
|
||||
pub fn server_loop(
|
||||
|
@ -51,13 +55,19 @@ pub fn server_loop(
|
|||
|
||||
poll.registry().register(
|
||||
&mut socket,
|
||||
mio::Token(0),
|
||||
NETWORK_READ,
|
||||
mio::Interest::READABLE | mio::Interest::WRITABLE,
|
||||
)?;
|
||||
|
||||
poll.registry().register(
|
||||
&mut socket,
|
||||
NETWORK_WRITE,
|
||||
mio::Interest::READABLE | mio::Interest::WRITABLE,
|
||||
)?;
|
||||
|
||||
poll.registry().register(
|
||||
&mut message_send_queue,
|
||||
mio::Token(1),
|
||||
CHANNEL_READ,
|
||||
mio::Interest::READABLE,
|
||||
)?;
|
||||
|
||||
|
@ -71,16 +81,18 @@ pub fn server_loop(
|
|||
|
||||
poll.poll(&mut events, timeout).unwrap();
|
||||
|
||||
let network_updates = true;
|
||||
let channel_updates = true;
|
||||
if network_updates {
|
||||
'read: loop {
|
||||
if events.is_empty() {
|
||||
log::debug!("connection timed out");
|
||||
clients.values_mut().for_each(|c| c.conn.on_timeout());
|
||||
break 'read;
|
||||
}
|
||||
let network_read = events.iter().any(|x| x.token() == NETWORK_READ);
|
||||
let network_write = events.iter().any(|x| x.token() == NETWORK_WRITE);
|
||||
let channel_updates = events.iter().any(|x| x.token() == CHANNEL_READ);
|
||||
|
||||
if events.is_empty() {
|
||||
log::debug!("connection timed out");
|
||||
clients.values_mut().for_each(|c| c.conn.on_timeout());
|
||||
continue;
|
||||
}
|
||||
|
||||
if network_read {
|
||||
'read: loop {
|
||||
let (len, from) = match socket.recv_from(&mut buf) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
|
@ -228,10 +240,6 @@ pub fn server_loop(
|
|||
log::debug!("{} processed {} bytes", client.conn.trace_id(), read);
|
||||
|
||||
if client.conn.is_in_early_data() || client.conn.is_established() {
|
||||
for stream_id in client.conn.writable() {
|
||||
handle_writable(&mut client.conn, &mut client.partial_responses, stream_id);
|
||||
}
|
||||
|
||||
// Process all readable streams.
|
||||
for stream in client.conn.readable() {
|
||||
let message =
|
||||
|
@ -260,6 +268,7 @@ pub fn server_loop(
|
|||
.iter_mut()
|
||||
.filter(|(_, client)| {
|
||||
client.conn.is_established()
|
||||
&& !client.conn.is_closed()
|
||||
&& client.filters.iter().any(|x| x.allows(&message))
|
||||
})
|
||||
.map(|x| x.1)
|
||||
|
@ -297,6 +306,10 @@ pub fn server_loop(
|
|||
.expect("Message should be serializable in binary");
|
||||
for client in dispatch_to {
|
||||
let stream_id = client.next_stream;
|
||||
|
||||
client.next_stream =
|
||||
get_next_unidi(stream_id, true, maximum_concurrent_streams);
|
||||
|
||||
match client.conn.stream_priority(stream_id, priority, true) {
|
||||
Ok(_) => {
|
||||
log::trace!("priority was set correctly");
|
||||
|
@ -309,14 +322,12 @@ pub fn server_loop(
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
client.next_stream =
|
||||
get_next_unidi(stream_id, true, maximum_concurrent_streams);
|
||||
log::debug!(
|
||||
"dispatching {} on stream id : {}",
|
||||
binary.len(),
|
||||
stream_id
|
||||
);
|
||||
|
||||
if let Err(e) = send_message(
|
||||
&mut client.conn,
|
||||
&mut client.partial_responses,
|
||||
|
@ -342,34 +353,40 @@ pub fn server_loop(
|
|||
// Generate outgoing QUIC packets for all active connections and send
|
||||
// them on the UDP socket, until quiche reports that there are no more
|
||||
// packets to be sent.
|
||||
for client in clients.values_mut() {
|
||||
loop {
|
||||
let (write, send_info) = match client.conn.send(&mut out) {
|
||||
Ok(v) => v,
|
||||
|
||||
Err(quiche::Error::Done) => {
|
||||
log::debug!("{} done writing", client.conn.trace_id());
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!("{} send failed: {:?}", client.conn.trace_id(), e);
|
||||
|
||||
client.conn.close(false, 0x1, b"fail").ok();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
|
||||
if e.kind() == std::io::ErrorKind::WouldBlock {
|
||||
log::debug!("send() would block");
|
||||
break;
|
||||
}
|
||||
|
||||
log::error!("send() failed: {:?}", e);
|
||||
if network_write || channel_updates {
|
||||
for client in clients.values_mut() {
|
||||
for stream_id in client.conn.writable() {
|
||||
handle_writable(&mut client.conn, &mut client.partial_responses, stream_id);
|
||||
}
|
||||
|
||||
log::debug!("{} written {} bytes", client.conn.trace_id(), write);
|
||||
loop {
|
||||
let (write, send_info) = match client.conn.send(&mut out) {
|
||||
Ok(v) => v,
|
||||
|
||||
Err(quiche::Error::Done) => {
|
||||
log::debug!("{} done writing", client.conn.trace_id());
|
||||
break;
|
||||
}
|
||||
|
||||
Err(e) => {
|
||||
log::error!("{} send failed: {:?}", client.conn.trace_id(), e);
|
||||
|
||||
client.conn.close(false, 0x1, b"fail").ok();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
|
||||
if e.kind() == std::io::ErrorKind::WouldBlock {
|
||||
log::debug!("send() would block");
|
||||
break;
|
||||
}
|
||||
|
||||
log::error!("send() failed: {:?}", e);
|
||||
}
|
||||
|
||||
log::debug!("{} written {} bytes", client.conn.trace_id(), write);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue