Fixing issue with large data transfers

This commit is contained in:
godmodegalactus 2024-05-23 14:57:23 +02:00
parent 46a01c8434
commit f8f35095e3
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 140 additions and 151 deletions

View File

@ -168,9 +168,8 @@ pub fn client_loop(
Err(e) => {
log::error!("send failed: {:?}", e);
conn.close(false, 0x1, b"fail").ok();
bail!("writing failed");
break;
}
};

View File

@ -69,9 +69,15 @@ pub fn handle_writable(
return;
}
};
if resp.written == resp.binary.len() {
if written == 0 {
return;
}
if written == resp.binary.len() {
log::debug!("fin writing stream : {}", stream_id);
partial_responses.remove(&stream_id);
match conn.stream_send(stream_id, &[], true) {
match conn.stream_send(stream_id, b"", true) {
Ok(_) => {}
Err(quiche::Error::Done) => {}
Err(e) => {

View File

@ -68,6 +68,7 @@ pub fn server_loop(
let rng = SystemRandom::new();
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let mut clients = ClientMap::new();
loop {
let timeout = clients.values().filter_map(|c| c.conn.timeout()).min();
log::debug!("timeout : {}", timeout.unwrap_or_default().as_millis());
@ -77,10 +78,6 @@ pub fn server_loop(
let network_read = events
.iter()
.any(|x| x.token() == NETWORK_TOKEN && x.is_readable());
let network_write = events
.iter()
.any(|x| x.token() == NETWORK_TOKEN && x.is_writable());
let channel_updates = events.iter().any(|x| x.token() == CHANNEL_TOKEN);
if events.is_empty() {
log::debug!("connection timed out");
@ -258,149 +255,136 @@ pub fn server_loop(
}
}
}
if channel_updates {
while let Ok(message) = message_send_queue.try_recv() {
let dispatch_to = clients
.iter_mut()
.filter(|(_, client)| {
client.conn.is_established()
&& !client.conn.is_closed()
&& client.filters.iter().any(|x| x.allows(&message))
})
.map(|x| x.1)
.collect_vec();
if !dispatch_to.is_empty() {
let (message, priority) = match message {
ChannelMessage::Account(account, slot, _) => {
let slot_identifier = SlotIdentifier { slot };
let geyser_account = Account::new(
account.pubkey,
account.account,
compression_type,
slot_identifier,
account.write_version,
);
(Message::AccountMsg(geyser_account), 4)
}
ChannelMessage::Slot(slot, parent, commitment_level) => (
Message::SlotMsg(SlotMeta {
slot,
parent,
commitment_level,
}),
1,
),
ChannelMessage::BlockMeta(block_meta) => {
(Message::BlockMetaMsg(block_meta), 2)
}
ChannelMessage::Transaction(transaction) => {
(Message::TransactionMsg(transaction), 3)
}
};
let binary = bincode::serialize(&message)
.expect("Message should be serializable in binary");
for client in dispatch_to {
let stream_id = client.next_stream;
client.next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams);
match client.conn.stream_priority(stream_id, priority, true) {
Ok(_) => {
log::trace!("priority was set correctly");
}
Err(e) => {
log::error!(
"Unable to set priority for the stream {}, error {}",
stream_id,
e
);
}
}
log::debug!(
"dispatching {} on stream id : {}",
binary.len(),
stream_id
);
if let Err(e) = send_message(
&mut client.conn,
&mut client.partial_responses,
stream_id,
&binary,
) {
log::error!("Error sending message : {e}");
if stop_laggy_client {
log::info!(
"Stopping laggy client : {}",
client.conn.trace_id()
);
if let Err(e) = client.conn.close(true, 1, b"laggy client") {
log::error!("error closing client : {}", e);
}
}
}
}
}
}
}
// Generate outgoing QUIC packets for all active connections and send
// them on the UDP socket, until quiche reports that there are no more
// packets to be sent.
if network_write || channel_updates {
for client in clients.values_mut() {
for stream_id in client.conn.writable() {
handle_writable(&mut client.conn, &mut client.partial_responses, stream_id);
}
loop {
let (write, send_info) = match client.conn.send(&mut out) {
Ok(v) => v,
Err(quiche::Error::Done) => {
log::debug!("{} done writing", client.conn.trace_id());
break;
}
Err(e) => {
log::error!("{} send failed: {:?}", client.conn.trace_id(), e);
client.conn.close(false, 0x1, b"fail").ok();
break;
}
};
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("send() would block");
break;
}
log::error!("send() failed: {:?}", e);
}
log::debug!("{} written {} bytes", client.conn.trace_id(), write);
}
}
}
// Garbage collect closed connections.
clients.retain(|_, ref mut c| {
log::debug!("Collecting garbage");
if c.conn.is_closed() {
log::debug!(
"{} connection collected {:?}",
c.conn.trace_id(),
c.conn.stats()
);
}
!c.conn.is_closed()
});
}
while let Ok(message) = message_send_queue.try_recv() {
let dispatch_to = clients
.iter_mut()
.filter(|(_, client)| {
client.conn.is_established()
&& !client.conn.is_closed()
&& client.filters.iter().any(|x| x.allows(&message))
})
.map(|x| x.1)
.collect_vec();
if !dispatch_to.is_empty() {
let (message, priority) = match message {
ChannelMessage::Account(account, slot, _) => {
let slot_identifier = SlotIdentifier { slot };
let geyser_account = Account::new(
account.pubkey,
account.account,
compression_type,
slot_identifier,
account.write_version,
);
(Message::AccountMsg(geyser_account), 4)
}
ChannelMessage::Slot(slot, parent, commitment_level) => (
Message::SlotMsg(SlotMeta {
slot,
parent,
commitment_level,
}),
1,
),
ChannelMessage::BlockMeta(block_meta) => (Message::BlockMetaMsg(block_meta), 2),
ChannelMessage::Transaction(transaction) => {
(Message::TransactionMsg(transaction), 3)
}
};
let binary =
bincode::serialize(&message).expect("Message should be serializable in binary");
for client in dispatch_to {
let stream_id = client.next_stream;
client.next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams);
match client.conn.stream_priority(stream_id, priority, true) {
Ok(_) => {
log::trace!("priority was set correctly");
}
Err(e) => {
log::error!(
"Unable to set priority for the stream {}, error {}",
stream_id,
e
);
}
}
log::debug!("dispatching {} on stream id : {}", binary.len(), stream_id);
if let Err(e) = send_message(
&mut client.conn,
&mut client.partial_responses,
stream_id,
&binary,
) {
log::error!("Error sending message : {e}");
if stop_laggy_client {
log::info!("Stopping laggy client : {}", client.conn.trace_id());
if let Err(e) = client.conn.close(true, 1, b"laggy client") {
log::error!("error closing client : {}", e);
}
}
}
}
}
}
// Generate outgoing QUIC packets for all active connections and send
// them on the UDP socket, until quiche reports that there are no more
// packets to be sent.
for client in clients.values_mut() {
for stream_id in client.conn.writable() {
handle_writable(&mut client.conn, &mut client.partial_responses, stream_id);
}
loop {
let (write, send_info) = match client.conn.send(&mut out) {
Ok(v) => v,
Err(quiche::Error::Done) => {
log::debug!("{} done writing", client.conn.trace_id());
break;
}
Err(e) => {
log::error!(
"{} send failed: {:?}, closing connection",
client.conn.trace_id(),
e
);
client.conn.close(false, 0x1, b"fail").ok();
break;
}
};
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
if e.kind() == std::io::ErrorKind::WouldBlock {
log::debug!("send() would block");
break;
}
log::error!("send() failed: {:?}", e);
}
log::debug!("{} written {} bytes", client.conn.trace_id(), write);
}
}
// Garbage collect closed connections.
clients.retain(|_, ref mut c| {
log::debug!("Collecting garbage");
if c.conn.is_closed() {
log::info!(
"{} connection closed {:?}",
c.conn.trace_id(),
c.conn.stats()
);
}
!c.conn.is_closed()
});
}
}