Merge pull request #13 from blockworks-foundation/using_handful_streams

Using handful streams
This commit is contained in:
galactus 2024-10-04 18:00:42 +02:00 committed by GitHub
commit f60fb5cd1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 634 additions and 282 deletions

7
Cargo.lock generated
View File

@ -790,6 +790,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "circular-buffer"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b67261db007b5f4cf8cba393c1a5c511a5cc072339ce16e12aeba1d7b9b77946"
[[package]]
name = "clang-sys"
version = "1.8.1"
@ -2933,6 +2939,7 @@ version = "0.1.5"
dependencies = [
"anyhow",
"bincode",
"circular-buffer",
"itertools",
"log",
"lz4",

View File

@ -63,6 +63,7 @@ git-version = "0.3.5"
vergen = "8.2.1"
rand = "0.8.5"
tokio = "1.28.2"
circular-buffer = "0.1.9"
quic-geyser-common = {path = "common", version="0.1.5"}
quic-geyser-client = {path = "client", version="0.1.5"}

View File

@ -2,6 +2,7 @@ use std::{
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
u64,
};
use quic_geyser_common::{
@ -12,7 +13,7 @@ use quic_geyser_common::{
use quic_geyser_quiche_utils::{
quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message},
quiche_utils::{generate_cid_and_reset_token, get_next_unidi, PartialResponses},
quiche_utils::{generate_cid_and_reset_token, get_next_unidi, StreamSenderMap},
};
use anyhow::bail;
@ -137,16 +138,11 @@ pub fn create_quiche_client_thread(
poll.registry()
.register(&mut receiver, mio::Token(1), mio::Interest::READABLE)
.unwrap();
let maximum_streams = u64::MAX;
let mut current_stream_id = 3;
let mut out = [0; MAX_PAYLOAD_BUFFER];
let mut partial_responses = PartialResponses::new();
let stream_id = get_next_unidi(0, false, u64::MAX);
let mut stream_sender_map = StreamSenderMap::new();
let mut read_streams = ReadStreams::new();
let mut connected = false;
let mut instance = Instant::now();
let ping_message = Arc::new(bincode::serialize(&Message::Ping).unwrap());
// Generate a random source connection ID for the connection.
let rng = SystemRandom::new();
@ -159,12 +155,11 @@ pub fn create_quiche_client_thread(
if instance.elapsed() > Duration::from_secs(1) {
log::debug!("sending ping to the server");
current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams);
if let Err(e) = send_message(
&mut connection,
&mut partial_responses,
current_stream_id,
ping_message.clone(),
&mut stream_sender_map,
stream_id,
Message::Ping.to_binary_stream(),
) {
log::error!("Error sending ping message : {e}");
}
@ -211,11 +206,16 @@ pub fn create_quiche_client_thread(
if connection.is_established() {
// io events
for stream_id in connection.readable() {
log::debug!("got readable stream");
let message = recv_message(&mut connection, &mut read_streams, stream_id);
match message {
Ok(Some(message)) => {
if let Err(e) = message_recv_queue.send(message) {
log::error!("Error sending message on the channel : {e}");
Ok(Some(messages)) => {
log::debug!("got messages: {}", messages.len());
for message in messages {
if let Err(e) = message_recv_queue.send(message) {
log::error!("Error sending message on the channel : {e}");
break;
}
}
}
Ok(None) => {
@ -223,6 +223,7 @@ pub fn create_quiche_client_thread(
}
Err(e) => {
log::error!("Error recieving message : {e}");
let _ = connection.close(true, 1, b"error recieving");
}
}
}
@ -230,16 +231,15 @@ pub fn create_quiche_client_thread(
loop {
match message_send_queue.try_recv() {
Ok(message_to_send) => {
current_stream_id =
get_next_unidi(current_stream_id, false, maximum_streams);
let binary = Arc::new(
bincode::serialize(&message_to_send)
.expect("Message should be serializable"),
log::debug!(
"sending message: {message_to_send:?} on stream : {stream_id:?}"
);
let binary = message_to_send.to_binary_stream();
log::debug!("finished binary message of length {}", binary.len());
if let Err(e) = send_message(
&mut connection,
&mut partial_responses,
current_stream_id,
&mut stream_sender_map,
stream_id,
binary,
) {
log::error!("Sending failed with error {e:?}");
@ -261,7 +261,7 @@ pub fn create_quiche_client_thread(
}
for stream_id in connection.writable() {
if let Err(e) = handle_writable(&mut connection, &mut partial_responses, stream_id)
if let Err(e) = handle_writable(&mut connection, &mut stream_sender_map, stream_id)
{
if e != quiche::Error::Done {
log::error!("Error writing message on writable stream : {e:?}");
@ -275,9 +275,11 @@ pub fn create_quiche_client_thread(
break;
}
let mut out = vec![0; MAX_PAYLOAD_BUFFER];
loop {
match connection.send(&mut out) {
Ok((write, send_info)) => {
log::debug!("sending :{}", write);
if sender.send((send_info, out[..write].to_vec())).is_err() {
log::error!("client socket thread broken");
break 'client;

View File

@ -7,12 +7,13 @@ use quic_geyser_common::message::Message;
use quic_geyser_common::net::parse_host_port;
use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream,
SendStream, TokioRuntime, TransportConfig, VarInt,
ClientConfig, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream, SendStream,
TokioRuntime, TransportConfig, VarInt,
};
use std::net::UdpSocket;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> Endpoint {
let mut endpoint = {
@ -66,32 +67,32 @@ pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> En
endpoint
}
pub async fn recv_message(
mut recv_stream: RecvStream,
timeout_in_seconds: u64,
) -> anyhow::Result<Message> {
let mut buffer = Vec::<u8>::new();
buffer.reserve(128 * 1024); // reserve 128 kbs for each message
// pub async fn recv_message(
// mut recv_stream: RecvStream,
// timeout_in_seconds: u64,
// ) -> anyhow::Result<Message> {
// let mut buffer = Vec::<u8>::new();
// buffer.reserve(128 * 1024); // reserve 128 kbs for each message
while let Some(data) = tokio::time::timeout(
Duration::from_secs(timeout_in_seconds),
recv_stream.read_chunk(DEFAULT_MAX_RECIEVE_WINDOW_SIZE as usize, true),
)
.await??
{
buffer.extend_from_slice(&data.bytes);
}
Ok(bincode::deserialize::<Message>(&buffer)?)
}
// while let Some(data) = tokio::time::timeout(
// Duration::from_secs(timeout_in_seconds),
// recv_stream.read_chunk(DEFAULT_MAX_RECIEVE_WINDOW_SIZE as usize, true),
// )
// .await??
// {
// buffer.extend_from_slice(&data.bytes);
// }
// Ok(bincode::deserialize::<Message>(&buffer)?)
// }
pub struct Client {
connection: Connection,
filter_sender: tokio::sync::mpsc::UnboundedSender<Vec<Filter>>,
}
pub async fn send_message(mut send_stream: SendStream, message: &Message) -> anyhow::Result<()> {
let binary = bincode::serialize(&message)?;
pub async fn send_message(send_stream: &mut SendStream, message: &Message) -> anyhow::Result<()> {
let binary = message.to_binary_stream();
send_stream.write_all(&binary).await?;
send_stream.finish().await?;
send_stream.flush().await?;
Ok(())
}
@ -104,7 +105,6 @@ impl Client {
tokio::sync::mpsc::UnboundedReceiver<Message>,
Vec<tokio::task::JoinHandle<anyhow::Result<()>>>,
)> {
let timeout: u64 = connection_parameters.timeout_in_seconds;
let endpoint = create_client_endpoint(connection_parameters);
let socket_addr = parse_host_port(&server_address)?;
let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?;
@ -116,31 +116,41 @@ impl Client {
let jh1 = {
let connection = connection.clone();
tokio::spawn(async move {
// limit client to respond to 128k streams in parallel
let semaphore = Arc::new(tokio::sync::Semaphore::new(128 * 1024));
loop {
// sender is closed / no messages to send
if message_sx_queue.is_closed() {
bail!("quic client stopped, sender closed");
}
let permit = semaphore.clone().acquire_owned().await.unwrap();
let stream: Result<RecvStream, ConnectionError> = connection.accept_uni().await;
match stream {
Ok(recv_stream) => {
let sender = message_sx_queue.clone();
Ok(mut recv_stream) => {
let message_sx_queue = message_sx_queue.clone();
tokio::spawn(async move {
//
let _permit = permit;
let message = recv_message(recv_stream, timeout).await;
match message {
Ok(message) => {
if let Err(e) = sender.send(message) {
log::error!("Message sent error : {:?}", e);
let mut buffer: Vec<u8> = vec![];
loop {
match recv_stream
.read_chunk(DEFAULT_MAX_RECIEVE_WINDOW_SIZE as usize, true)
.await
{
Ok(Some(chunk)) => {
buffer.extend_from_slice(&chunk.bytes);
while let Some((message, size)) =
Message::from_binary_stream(&buffer)
{
if let Err(e) = message_sx_queue.send(message) {
log::error!("Message sent error : {:?}", e);
break;
}
buffer.drain(..size);
}
}
Ok(None) => {
log::warn!("Chunk none");
}
Err(e) => {
log::debug!("Error getting message {:?}", e);
break;
}
}
Err(e) => {
log::trace!("Error getting message {:?}", e);
}
}
});
@ -163,33 +173,46 @@ impl Client {
})
};
// create a ping thread
// create a ping thread and subscribe thread
let (filter_sender, mut filter_rx) = tokio::sync::mpsc::unbounded_channel();
let jh2 = {
let connection = connection.clone();
tokio::spawn(async move {
let ping_message = bincode::serialize(&Message::Ping)
.expect("ping message should be serializable");
let mut uni_stream = connection.open_uni().await?;
loop {
tokio::select! {
filters = filter_rx.recv() => {
if let Some(filters) = filters {
log::debug!("Sending server filters: {filters:?} on {}", uni_stream.id());
if let Err(e) = send_message(&mut uni_stream, &Message::Filters(filters)).await {
log::error!("Error while sending filters : {e:?}");
}
}
break;
},
_ = tokio::time::sleep(Duration::from_secs(1)) => {
send_message( &mut uni_stream, &Message::Ping).await?;
}
}
}
// keep sending pings
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
if let Ok(mut uni_send_stream) = connection.open_uni().await {
let _ = uni_send_stream.write_all(&ping_message).await;
let _ = uni_send_stream.finish().await;
} else {
// connection closed
if let Err(e) = send_message(&mut uni_stream, &Message::Ping).await {
log::error!("Error while sending ping message : {e:?}");
break;
}
}
bail!("quic client stopped, ping message thread dropped")
Ok(())
})
};
Ok((Client { connection }, message_rx_queue, vec![jh1, jh2]))
Ok((Client { filter_sender }, message_rx_queue, vec![jh1, jh2]))
}
pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let send_stream = self.connection.open_uni().await?;
send_message(send_stream, &Message::Filters(filters)).await?;
self.filter_sender.send(filters)?;
Ok(())
}
}
@ -254,7 +277,8 @@ mod tests {
#[tokio::test]
pub async fn test_non_blocking_client() {
let server_sock: SocketAddr = parse_host_port("[::]:20000").unwrap();
tracing_subscriber::fmt::init();
let server_sock: SocketAddr = parse_host_port("0.0.0.0:20000").unwrap();
let url = format!("127.0.0.1:{}", server_sock.port());
let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
@ -262,7 +286,7 @@ mod tests {
let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100));
let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000));
let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000));
let msg_acc_6 = Message::AccountMsg(get_account_for_test(4, 100_000_000));
let msg_acc_6 = Message::AccountMsg(get_account_for_test(4, 10_000_000));
let msgs = [
msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5, msg_acc_6,
];

View File

@ -15,6 +15,7 @@ thiserror = {workspace = true}
itertools = { workspace = true }
lz4 = { workspace = true }
bincode = { workspace = true }
circular-buffer = {workspace = true}
[dev-dependencies]
rand = { workspace = true }

View File

@ -1,4 +1,4 @@
pub const DEFAULT_MAX_STREAMS: u64 = 128 * 1024;
pub const DEFAULT_MAX_STREAMS: u64 = 128;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 24 * 1024 * 1024; // 24 MBs
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;
pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10;

View File

@ -6,4 +6,5 @@ pub mod filters;
pub mod message;
pub mod net;
pub mod plugin_error;
pub mod stream_manager;
pub mod types;

View File

@ -10,9 +10,6 @@ use crate::{
},
};
// current maximum message size
pub const MAX_MESSAGE_SIZE: u64 = 20_000_000;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
pub enum Message {
@ -24,3 +21,127 @@ pub enum Message {
Filters(Vec<Filter>), // sent from client to server
Ping,
}
impl Message {
// used by the network
pub fn from_binary_stream(stream: &[u8]) -> Option<(Message, usize)> {
if stream.len() < 8 {
return None;
}
let size = u64::from_le_bytes(stream[0..8].try_into().unwrap()) as usize;
if stream.len() < size + 8 {
return None;
}
let message = bincode::deserialize::<Self>(&stream[8..size + 8]).unwrap();
Some((message, size + 8))
}
pub fn to_binary_stream(&self) -> Vec<u8> {
let binary = bincode::serialize(self).unwrap();
let size = binary.len().to_le_bytes();
[size.to_vec(), binary].concat()
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use rand::{rngs::ThreadRng, Rng};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use crate::types::{
account::Account,
block_meta::{BlockMeta, SlotMeta},
slot_identifier::SlotIdentifier,
};
use super::Message;
pub fn _create_random_message(rng: &mut ThreadRng) -> Message {
let message_type = rng.gen::<u8>() % 3;
match message_type {
0 => Message::SlotMsg(SlotMeta {
slot: rng.gen(),
parent: rng.gen(),
commitment_config: CommitmentConfig::processed(),
}),
1 => {
let data_length = rng.gen_range(10..128);
let data = (0..data_length).map(|_| rng.gen::<u8>()).collect_vec();
Message::AccountMsg(Account {
slot_identifier: SlotIdentifier { slot: rng.gen() },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
lamports: rng.gen(),
executable: rng.gen_bool(0.5),
rent_epoch: rng.gen(),
write_version: rng.gen(),
data,
compression_type: crate::compression::CompressionType::None,
data_length,
})
}
2 => Message::BlockMetaMsg(BlockMeta {
parent_blockhash: "jfkjahfkajfnaf".to_string(),
parent_slot: rng.gen(),
slot: rng.gen(),
blockhash: "lkjsahkjhakda".to_string(),
block_height: rng.gen(),
rewards: vec![],
entries_count: rng.gen(),
executed_transaction_count: rng.gen(),
block_time: rng.gen(),
}),
_ => {
unreachable!()
}
}
}
#[test]
pub fn check_slot_message_size() {
let message = Message::SlotMsg(SlotMeta {
slot: 73282,
parent: 8392983,
commitment_config: CommitmentConfig::finalized(),
});
let binary = message.to_binary_stream();
assert_eq!(binary.len(), 32);
}
#[test]
pub fn from_to_binary_stream() {
let message = Message::SlotMsg(SlotMeta {
slot: 73282,
parent: 8392983,
commitment_config: CommitmentConfig::finalized(),
});
let binary = message.to_binary_stream();
let (msg_2, _) = Message::from_binary_stream(&binary).unwrap();
assert_eq!(message, msg_2);
let account_data = (0..1000).map(|x: u32| (x % 255) as u8).collect();
let message_account = Message::AccountMsg(Account {
slot_identifier: SlotIdentifier { slot: 938920 },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
lamports: 84782739,
executable: true,
rent_epoch: 849293,
write_version: 9403,
data: account_data,
compression_type: crate::compression::CompressionType::None,
data_length: 1000,
});
let binary_2 = message_account.to_binary_stream();
let total_binary = [binary_2, binary].concat();
assert!(Message::from_binary_stream(&total_binary[..32]).is_none());
let (msg3, size_msg3) = Message::from_binary_stream(&total_binary).unwrap();
assert_eq!(msg3, message_account);
let (msg4, _) = Message::from_binary_stream(&total_binary[size_msg3..]).unwrap();
assert_eq!(msg4, message);
}
}

View File

@ -30,8 +30,6 @@ mod test {
parse_host_port("127.0.0.0:1234").unwrap();
parse_host_port("127.0.0.0").unwrap_err();
parse_host_port("[::]:1234").unwrap();
parse_host_port("fcs-ams1._peer.internal:1234").unwrap();
parse_host_port("fcs-ams1._peer.internal:8172").unwrap();
}
#[test]

View File

@ -0,0 +1,158 @@
pub struct StreamSender<const BUFFER_LEN: usize> {
buffer: Box<circular_buffer::CircularBuffer<BUFFER_LEN, u8>>,
}
#[allow(clippy::new_without_default)]
impl<const BUFFER_LEN: usize> StreamSender<BUFFER_LEN> {
pub fn new() -> StreamSender<BUFFER_LEN> {
StreamSender {
buffer: circular_buffer::CircularBuffer::boxed(),
}
}
pub fn append_bytes(&mut self, bytes: &[u8]) -> bool {
if self.capacity() > bytes.len() {
self.buffer.extend_from_slice(bytes);
true
} else {
// not enough capacity
false
}
}
pub fn as_slices(&self) -> (&[u8], &[u8]) {
self.buffer.as_slices()
}
pub fn consume(&mut self, nb_bytes: usize) -> bool {
if self.buffer.len() < nb_bytes {
return false;
}
let d = self.buffer.drain(..nb_bytes);
assert_eq!(d.len(), nb_bytes);
true
}
pub fn as_buffer(&self) -> Vec<u8> {
let (slice1, slice2) = self.as_slices();
[slice1, slice2].concat()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn capacity(&self) -> usize {
BUFFER_LEN - self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
}
#[cfg(test)]
mod tests {
use circular_buffer::CircularBuffer;
use itertools::Itertools;
use rand::{rngs::ThreadRng, Rng};
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use crate::{
message::Message,
types::{
account::Account,
block_meta::{BlockMeta, SlotMeta},
slot_identifier::SlotIdentifier,
},
};
use super::StreamSender;
#[test]
pub fn test_drain_on_circular_buffer() {
let mut buf = CircularBuffer::<6, char>::from_iter("abcdef".chars());
let drained = buf.drain(..3).collect::<Vec<char>>();
assert_eq!(drained, vec!['a', 'b', 'c']);
buf.extend_from_slice(&"ghi".chars().collect_vec());
let (s1, s2) = buf.as_slices();
assert_eq!(s1, vec!['d', 'e', 'f', 'g', 'h', 'i']);
assert!(s2.is_empty());
let drained = buf.drain(..3).collect::<Vec<char>>();
assert_eq!(drained, vec!['d', 'e', 'f']);
}
pub fn create_random_message(rng: &mut ThreadRng) -> Message {
let message_type = rng.gen::<u8>() % 3;
match message_type {
0 => Message::SlotMsg(SlotMeta {
slot: rng.gen(),
parent: rng.gen(),
commitment_config: CommitmentConfig::processed(),
}),
1 => {
let data_length = rng.gen_range(10..128);
let data = (0..data_length).map(|_| rng.gen::<u8>()).collect_vec();
Message::AccountMsg(Account {
slot_identifier: SlotIdentifier { slot: rng.gen() },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
lamports: rng.gen(),
executable: rng.gen_bool(0.5),
rent_epoch: rng.gen(),
write_version: rng.gen(),
data,
compression_type: crate::compression::CompressionType::None,
data_length,
})
}
2 => Message::BlockMetaMsg(BlockMeta {
parent_blockhash: "jfkjahfkajfnaf".to_string(),
parent_slot: rng.gen(),
slot: rng.gen(),
blockhash: "lkjsahkjhakda".to_string(),
block_height: rng.gen(),
rewards: vec![],
entries_count: rng.gen(),
executed_transaction_count: rng.gen(),
block_time: rng.gen(),
}),
_ => {
unreachable!()
}
}
}
#[test]
pub fn create_and_consume_random_messages() {
let mut buffer = StreamSender::<3000>::new();
let mut rng = rand::thread_rng();
let mut messages_appended = vec![];
let mut messages_consumed = vec![];
for _ in 0..1_000_000 {
let do_append = rng.gen_bool(0.6);
if do_append {
let message = create_random_message(&mut rng);
if buffer.append_bytes(&message.to_binary_stream()) {
messages_appended.push(message);
}
} else {
let buf = buffer.as_slices();
if let Some((message, size)) = Message::from_binary_stream(buf.0) {
messages_consumed.push(message);
buffer.consume(size);
}
}
}
while let Some((message, size)) = Message::from_binary_stream(buffer.as_slices().0) {
messages_consumed.push(message);
buffer.consume(size);
}
println!("Added : {} messages", messages_appended.len());
assert_eq!(messages_appended, messages_consumed);
}
}

View File

@ -1,14 +1,11 @@
{
"libpath": "target/debug/libquic_geyser_plugin.so",
"quic_plugin": {
"address": "0.0.0.0:10800",
"address": "[::]:10800",
"compression_parameters": {
"compression_type": {
"Lz4Fast": 8
}
}
},
"rpc_server" : {
"enable" : true
}
}

View File

@ -144,7 +144,7 @@ fn blocking(args: Args, client_stats: ClientStats, break_thread: Arc<AtomicBool>
std::thread::spawn(move || {
let _client = client;
while let Ok(message) = reciever.recv() {
let message_size = bincode::serialize(&message).unwrap().len();
let message_size = message.to_binary_stream().len();
client_stats
.bytes_transfered
.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);
@ -242,21 +242,21 @@ async fn non_blocking(args: Args, client_stats: ClientStats, break_thread: Arc<A
.unwrap();
println!("Connected");
tokio::time::sleep(Duration::from_secs(1)).await;
let mut filters = vec![Filter::Slot, Filter::BlockMeta];
if args.blocks_instead_of_accounts {
filters.push(Filter::BlockAll);
} else {
filters.push(Filter::AccountsAll);
}
// let mut filters = vec![Filter::Slot, Filter::BlockMeta];
// if args.blocks_instead_of_accounts {
// filters.push(Filter::BlockAll);
// } else {
// filters.push(Filter::AccountsAll);
// }
sleep(Duration::from_secs(5));
println!("Subscribing");
client.subscribe(filters).await.unwrap();
client.subscribe(vec![Filter::AccountsAll]).await.unwrap();
println!("Subscribed");
tokio::spawn(async move {
while let Some(message) = reciever.recv().await {
let message_size = bincode::serialize(&message).unwrap().len();
let message_size = message.to_binary_stream().len();
client_stats
.bytes_transfered
.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);

View File

@ -1,51 +1,74 @@
use std::collections::HashMap;
use anyhow::bail;
use quic_geyser_common::{
defaults::MAX_DATAGRAM_SIZE, message::Message, stream_manager::StreamSender,
};
use std::collections::BTreeMap;
use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message};
pub fn convert_binary_to_message(bytes: Vec<u8>) -> anyhow::Result<Message> {
Ok(bincode::deserialize::<Message>(&bytes)?)
}
pub type ReadStreams = HashMap<u64, Vec<u8>>;
const BUFFER_SIZE: usize = 32 * 1024 * 1024;
pub type ReadStreams = BTreeMap<u64, StreamSender<BUFFER_SIZE>>;
pub fn recv_message(
connection: &mut quiche::Connection,
read_streams: &mut ReadStreams,
stream_id: u64,
) -> anyhow::Result<Option<Message>> {
let mut total_buf = match read_streams.remove(&stream_id) {
Some(buf) => buf,
None => vec![],
};
loop {
let mut buf = [0; MAX_DATAGRAM_SIZE];
match connection.stream_recv(stream_id, &mut buf) {
Ok((read, fin)) => {
log::trace!("read {} on stream {}", read, stream_id);
total_buf.extend_from_slice(&buf[..read]);
if fin {
log::trace!("fin stream : {}", stream_id);
match bincode::deserialize::<Message>(&total_buf) {
Ok(message) => return Ok(Some(message)),
Err(e) => {
bail!("Error deserializing stream {stream_id} error: {e:?}");
}
}
) -> anyhow::Result<Option<Vec<Message>>> {
let mut buf = [0; MAX_DATAGRAM_SIZE];
if let Some(total_buf) = read_streams.get_mut(&stream_id) {
loop {
match connection.stream_recv(stream_id, &mut buf) {
Ok((read, _)) => {
log::trace!("read {} on stream {}", read, stream_id);
total_buf.append_bytes(&buf[..read]);
}
}
Err(e) => {
match &e {
Err(e) => match &e {
quiche::Error::Done => {
// will be tried again later
read_streams.insert(stream_id, total_buf);
return Ok(None);
let mut messages = vec![];
if let Some((message, size)) =
Message::from_binary_stream(total_buf.as_slices().0)
{
total_buf.consume(size);
messages.push(message);
}
return Ok(if messages.is_empty() {
None
} else {
Some(messages)
});
}
_ => {
bail!("read error on stream : {}, error: {}", stream_id, e);
}
},
}
}
} else {
let mut total_buf = StreamSender::<BUFFER_SIZE>::new();
loop {
match connection.stream_recv(stream_id, &mut buf) {
Ok((read, _)) => {
log::trace!("read {} on stream {}", read, stream_id);
total_buf.append_bytes(&buf[..read]);
}
Err(e) => match &e {
quiche::Error::Done => {
let mut messages = vec![];
if let Some((message, size)) =
Message::from_binary_stream(total_buf.as_slices().0)
{
total_buf.consume(size);
messages.push(message);
}
read_streams.insert(stream_id, total_buf);
return Ok(if messages.is_empty() {
None
} else {
Some(messages)
});
}
_ => {
bail!("read error on stream : {}, error: {}", stream_id, e);
}
},
}
}
}

View File

@ -1,40 +1,50 @@
use std::sync::Arc;
use crate::quiche_utils::{PartialResponse, PartialResponses};
use prometheus::{opts, register_int_gauge, IntGauge};
use quic_geyser_common::message::Message;
use crate::quiche_utils::{StreamSenderMap, StreamSenderWithDefaultCapacity};
use quiche::Connection;
lazy_static::lazy_static!(
static ref NUMBER_OF_PARTIAL_RESPONSES: IntGauge =
register_int_gauge!(opts!("quic_plugin_nb_streams_open", "Number of streams that are open")).unwrap();
);
pub fn convert_to_binary(message: &Message) -> anyhow::Result<Vec<u8>> {
Ok(bincode::serialize(&message)?)
}
// return if connection has finished writing
pub fn send_message(
connection: &mut Connection,
partial_responses: &mut PartialResponses,
stream_sender_map: &mut StreamSenderMap,
stream_id: u64,
message: Arc<Vec<u8>>,
mut message: Vec<u8>,
) -> std::result::Result<(), quiche::Error> {
let written = match connection.stream_send(stream_id, &message, true) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
return Err(e);
if let Some(stream_sender) = stream_sender_map.get_mut(&stream_id) {
if stream_sender.is_empty() {
let written = match connection.stream_send(stream_id, &message, false) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
return Err(e);
}
};
log::debug!("dispatched {} on stream id : {}", written, stream_id);
if written < message.len() {
log::debug!("appending bytes : {}", message.len() - written);
message.drain(..written);
if !stream_sender.append_bytes(&message) {
return Err(quiche::Error::BufferTooShort);
}
}
} else if !stream_sender.append_bytes(&message) {
return Err(quiche::Error::BufferTooShort);
}
};
log::trace!("dispatched {} on stream id : {}", written, stream_id);
if written < message.len() {
let response = PartialResponse { message, written };
NUMBER_OF_PARTIAL_RESPONSES.inc();
partial_responses.insert(stream_id, response);
} else {
let written = match connection.stream_send(stream_id, &message, false) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(e) => {
return Err(e);
}
};
log::debug!("dispatched {} on stream id : {}", written, stream_id);
log::debug!("Creating new streambuffer : {}", message.len() - written);
message.drain(..written);
let mut new_stream_sender = StreamSenderWithDefaultCapacity::new();
if !new_stream_sender.append_bytes(&message) {
return Err(quiche::Error::BufferTooShort);
}
stream_sender_map.insert(stream_id, new_stream_sender);
}
Ok(())
}
@ -42,46 +52,31 @@ pub fn send_message(
/// Handles newly writable streams.
pub fn handle_writable(
conn: &mut quiche::Connection,
partial_responses: &mut PartialResponses,
stream_sender_map: &mut StreamSenderMap,
stream_id: u64,
) -> std::result::Result<(), quiche::Error> {
let resp = match partial_responses.get_mut(&stream_id) {
Some(s) => s,
None => {
if let Err(e) = conn.stream_shutdown(stream_id, quiche::Shutdown::Write, 0) {
log::error!("error shutting down stream {stream_id:?}, error :{e}");
if let Some(stream_sender) = stream_sender_map.get_mut(&stream_id) {
let (s1, _s2) = stream_sender.as_slices();
if !s1.is_empty() {
match conn.stream_send(stream_id, s1, false) {
Ok(written) => {
if written > 0 {
stream_sender.consume(written);
}
}
Err(quiche::Error::Done) => {
// above
return Err(quiche::Error::Done);
}
Err(e) => {
log::error!(
"{} stream id :{stream_id} send failed {e:?}",
conn.trace_id()
);
return Err(e);
}
}
return Ok(());
}
};
let written = match conn.stream_send(stream_id, &resp.message[resp.written..], true) {
Ok(v) => v,
Err(quiche::Error::Done) => {
// above
return Err(quiche::Error::Done);
}
Err(e) => {
NUMBER_OF_PARTIAL_RESPONSES.dec();
partial_responses.remove(&stream_id);
log::error!(
"{} stream id :{stream_id} send failed {e:?}",
conn.trace_id()
);
return Err(e);
}
};
if written == 0 {
return Ok(());
}
if resp.written + written == resp.message.len() {
NUMBER_OF_PARTIAL_RESPONSES.dec();
partial_responses.remove(&stream_id);
} else {
resp.written += written;
}
Ok(())
}

View File

@ -1,6 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};
use quic_geyser_common::stream_manager::StreamSender;
use ring::rand::SecureRandom;
use std::collections::BTreeMap;
pub fn validate_token<'a>(
src: &std::net::SocketAddr,
@ -162,9 +162,7 @@ pub fn generate_cid_and_reset_token<T: SecureRandom>(
(scid, reset_token)
}
pub struct PartialResponse {
pub message: Arc<Vec<u8>>,
pub written: usize,
}
pub type PartialResponses = BTreeMap<u64, PartialResponse>;
// 16 MB per buffer
pub const BUFFER_LEN: usize = 32 * 1024 * 1024;
pub type StreamSenderWithDefaultCapacity = StreamSender<BUFFER_LEN>;
pub type StreamSenderMap = BTreeMap<u64, StreamSenderWithDefaultCapacity>;

View File

@ -1,13 +1,13 @@
use std::{
collections::HashMap,
net::SocketAddr,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{
atomic::AtomicBool,
mpsc::{self, Sender},
Arc, Mutex, RwLock,
},
time::{Duration, Instant},
time::Duration,
u64,
};
use anyhow::bail;
@ -32,7 +32,7 @@ use quic_geyser_quiche_utils::{
quiche_sender::{handle_writable, send_message},
quiche_utils::{
generate_cid_and_reset_token, get_next_unidi, handle_path_events, mint_token,
validate_token, PartialResponses,
validate_token, StreamSenderMap,
},
};
@ -75,7 +75,7 @@ use crate::configure_server::configure_server;
enum InternalMessage {
Packet(quiche::RecvInfo, Vec<u8>),
ClientMessage(Arc<Vec<u8>>, u8),
ClientMessage(Vec<u8>, u8),
}
struct DispatchingData {
@ -92,7 +92,7 @@ pub fn server_loop(
compression_type: CompressionType,
stop_laggy_client: bool,
) -> anyhow::Result<()> {
let maximum_concurrent_streams_id = u64::MAX;
let maximum_concurrent_streams = 32;
let mut config = configure_server(quic_params)?;
let socket = Arc::new(UdpSocket::bind(socket_addr)?);
@ -165,7 +165,12 @@ pub fn server_loop(
if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) {
drop(clients_lk);
if hdr.ty != quiche::Type::Initial {
log::error!("Packet is not Initial");
log::error!(
"Packet is not Initial : {:?} for dicd : {:?} and connection_id: {:?}",
hdr.ty,
&hdr.dcid,
conn_id
);
continue;
}
@ -257,7 +262,7 @@ pub fn server_loop(
clients_by_id.clone(),
client_message_rx,
filters.clone(),
maximum_concurrent_streams_id,
maximum_concurrent_streams,
stop_laggy_client,
quic_params.incremental_priority,
rng.clone(),
@ -317,7 +322,7 @@ fn create_client_task(
client_id_by_scid: Arc<Mutex<HashMap<ConnectionId<'static>, u64>>>,
receiver: mpsc::Receiver<InternalMessage>,
filters: Arc<RwLock<Vec<Filter>>>,
maximum_concurrent_streams_id: u64,
maximum_concurrent_streams: usize,
stop_laggy_client: bool,
incremental_priority: bool,
rng: SystemRandom,
@ -325,11 +330,11 @@ fn create_client_task(
enable_gso: bool,
) {
std::thread::spawn(move || {
let mut partial_responses = PartialResponses::new();
let mut stream_sender_map = StreamSenderMap::new();
let mut read_streams = ReadStreams::new();
let mut next_stream: u64 = 3;
let first_stream_id = get_next_unidi(3, true, u64::MAX);
let mut next_stream: u64 = first_stream_id;
let mut connection = connection;
let mut instance = Instant::now();
let mut closed = false;
let mut out = [0; 65535];
let mut datagram_size = MAX_DATAGRAM_SIZE;
@ -353,6 +358,7 @@ fn create_client_task(
match internal_message {
InternalMessage::Packet(info, mut buf) => {
log::debug!("got packet : {}", buf.len());
// handle packet from udp socket
let buf = buf.as_mut_slice();
match connection.recv(buf, info) {
@ -364,38 +370,58 @@ fn create_client_task(
};
}
InternalMessage::ClientMessage(message, priority) => {
log::debug!("got message : {}", message.len());
if closed {
// connection is already closed
continue;
}
// handle message from client
let stream_id = next_stream;
next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
let close = if let Err(e) =
connection.stream_priority(stream_id, priority, incremental_priority)
{
if !closed {
log::error!(
"Unable to set priority for the stream {}, error {}",
stream_id,
e
);
// create new stream is there are still streams to be used or else use the existing one with most capacity
let stream_id = if stream_sender_map.len() < maximum_concurrent_streams {
let stream_id_to_use = next_stream;
next_stream = get_next_unidi(stream_id_to_use, true, u64::MAX);
log::debug!("Creating new stream to use :{stream_id_to_use}");
if stream_id_to_use == first_stream_id {
// set high priority to first stream
connection
.stream_priority(stream_id_to_use, 0, incremental_priority)
.unwrap();
} else {
connection
.stream_priority(stream_id_to_use, 1, incremental_priority)
.unwrap();
}
true
stream_id_to_use
} else {
match send_message(
&mut connection,
&mut partial_responses,
stream_id,
message,
) {
Ok(_) => {
// do nothing
false
}
Err(e) => {
// done writing / queue is full
log::error!("got error sending message client : {}", e);
true
}
// for high priority streams
let stream_id = if priority == 0 {
first_stream_id
} else {
let value = stream_sender_map
.iter()
.max_by(|x, y| x.1.capacity().cmp(&y.1.capacity()))
.unwrap()
.0;
*value
};
log::debug!("Reusing stream {stream_id}");
stream_id
};
let close = match send_message(
&mut connection,
&mut stream_sender_map,
stream_id,
message,
) {
Ok(_) => {
// do nothing
false
}
Err(e) => {
// done writing / queue is full
log::error!("got error sending message client : {}", e);
true
}
};
@ -424,21 +450,27 @@ fn create_client_task(
for stream in connection.readable() {
let message = recv_message(&mut connection, &mut read_streams, stream);
match message {
Ok(Some(message)) => match message {
Message::Filters(mut f) => {
let mut filter_lk = filters.write().unwrap();
filter_lk.append(&mut f);
Ok(Some(messages)) => {
let mut filter_lk = filters.write().unwrap();
for message in messages {
match message {
Message::Filters(mut f) => {
filter_lk.append(&mut f);
}
Message::Ping => {
log::debug!("recieved ping from the client");
}
_ => {
log::error!("unknown message from the client");
}
}
}
Message::Ping => {
log::debug!("recieved ping from the client");
}
_ => {
log::error!("unknown message from the client");
}
},
}
Ok(None) => {}
Err(e) => {
log::error!("Error recieving message : {e}")
log::error!("Error recieving message : {e}");
// missed the message close the connection
let _ = connection.close(true, 0, b"recv error");
}
}
}
@ -450,7 +482,7 @@ fn create_client_task(
datagram_size = connection.max_send_udp_payload_size();
for stream_id in connection.writable() {
if let Err(e) =
handle_writable(&mut connection, &mut partial_responses, stream_id)
handle_writable(&mut connection, &mut stream_sender_map, stream_id)
{
if e == quiche::Error::Done {
break;
@ -465,27 +497,23 @@ fn create_client_task(
}
}
if instance.elapsed() > Duration::from_secs(1) {
log::debug!("other tasks");
instance = Instant::now();
handle_path_events(&mut connection);
handle_path_events(&mut connection);
// See whether source Connection IDs have been retired.
while let Some(retired_scid) = connection.retired_scid_next() {
log::info!("Retiring source CID {:?}", retired_scid);
client_id_by_scid.lock().unwrap().remove(&retired_scid);
}
// Provides as many CIDs as possible.
while connection.scids_left() > 0 {
let (scid, reset_token) = generate_cid_and_reset_token(&rng);
log::info!("providing new scid {scid:?}");
if connection.new_scid(&scid, reset_token, false).is_err() {
break;
}
client_id_by_scid.lock().unwrap().insert(scid, client_id);
// See whether source Connection IDs have been retired.
while let Some(retired_scid) = connection.retired_scid_next() {
log::info!("Retiring source CID {:?}", retired_scid);
client_id_by_scid.lock().unwrap().remove(&retired_scid);
}
// Provides as many CIDs as possible.
while connection.scids_left() > 0 {
let (scid, reset_token) = generate_cid_and_reset_token(&rng);
log::info!("providing new scid {scid:?}");
if connection.new_scid(&scid, reset_token, false).is_err() {
break;
}
client_id_by_scid.lock().unwrap().insert(scid, client_id);
}
let mut send_message_to = None;
@ -627,12 +655,12 @@ fn create_dispatching_thread(
parent,
commitment_config,
}),
1,
0,
)
}
ChannelMessage::BlockMeta(block_meta) => {
NUMBER_OF_BLOCKMETA_UPDATE.inc();
(Message::BlockMetaMsg(block_meta), 2)
(Message::BlockMetaMsg(block_meta), 0)
}
ChannelMessage::Transaction(transaction) => {
NUMBER_OF_TRANSACTION_UPDATES.inc();
@ -643,9 +671,7 @@ fn create_dispatching_thread(
(Message::BlockMsg(block), 2)
}
};
let binary = Arc::new(
bincode::serialize(&message).expect("Message should be serializable in binary"),
);
let binary = message.to_binary_stream();
for id in dispatching_connections.iter() {
let data = dispatching_connections_lk.get(id).unwrap();
if data