From 38e70b06de1e6fca86b43e160a2688fc56838cd4 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Fri, 17 May 2024 15:46:38 +0200 Subject: [PATCH] Making all transfered types repr(C), renaming dispatch --- client/src/client.rs | 2 +- common/src/compression.rs | 1 + common/src/filters.rs | 2 ++ common/src/message.rs | 1 + common/src/quic/connection_manager.rs | 2 +- common/src/quic/quic_server.rs | 8 ++++---- common/src/quic/quinn_reciever.rs | 2 +- common/src/types/account.rs | 1 + common/src/types/block_meta.rs | 2 ++ common/src/types/connections_parameters.rs | 1 + common/src/types/slot_identifier.rs | 1 + common/src/types/transaction.rs | 2 ++ 12 files changed, 18 insertions(+), 7 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index e4f5744..9ee5bc8 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -148,7 +148,7 @@ mod tests { notify_server_start.notify_one(); notify_subscription.notified().await; for msg in msgs { - connection_manager.dispach(msg, 10).await; + connection_manager.dispatch(msg, 10).await; } }); } diff --git a/common/src/compression.rs b/common/src/compression.rs index 6f0ae61..0927ecf 100644 --- a/common/src/compression.rs +++ b/common/src/compression.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[repr(C)] pub enum CompressionType { None, Lz4Fast(u32), diff --git a/common/src/filters.rs b/common/src/filters.rs index 3dee9fb..8447eab 100644 --- a/common/src/filters.rs +++ b/common/src/filters.rs @@ -6,6 +6,7 @@ use solana_sdk::{pubkey::Pubkey, signature::Signature}; use crate::message::Message; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub enum Filter { Account(AccountFilter), Slot, @@ -39,6 +40,7 @@ impl Filter { // setting owner to 11111111111111111111111111111111 will subscribe to all the accounts #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct AccountFilter { pub owner: Option, pub accounts: Option>, diff --git a/common/src/message.rs b/common/src/message.rs index 10f7ebf..9dae047 100644 --- a/common/src/message.rs +++ b/common/src/message.rs @@ -11,6 +11,7 @@ use crate::{ }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub enum Message { AccountMsg(Account), SlotMsg(SlotMeta), diff --git a/common/src/quic/connection_manager.rs b/common/src/quic/connection_manager.rs index 060e82e..b99c2ab 100644 --- a/common/src/quic/connection_manager.rs +++ b/common/src/quic/connection_manager.rs @@ -187,7 +187,7 @@ impl ConnectionManager { }); } - pub async fn dispach(&self, message: Message, retry_count: u64) { + pub async fn dispatch(&self, message: Message, retry_count: u64) { let lk = self.connections.read().await; for connection_data in lk.iter() { diff --git a/common/src/quic/quic_server.rs b/common/src/quic/quic_server.rs index f8aa8ce..207d840 100644 --- a/common/src/quic/quic_server.rs +++ b/common/src/quic/quic_server.rs @@ -93,15 +93,15 @@ impl QuicServer { parent, commitment_level, }); - quic_connection_manager.dispach(message, retry_count).await; + quic_connection_manager.dispatch(message, retry_count).await; } ChannelMessage::BlockMeta(block_meta) => { let message = Message::BlockMetaMsg(block_meta); - quic_connection_manager.dispach(message, retry_count).await; + quic_connection_manager.dispatch(message, retry_count).await; } ChannelMessage::Transaction(transaction) => { let message = Message::TransactionMsg(transaction); - quic_connection_manager.dispach(message, retry_count).await; + quic_connection_manager.dispatch(message, retry_count).await; } } } @@ -140,6 +140,6 @@ fn process_account_message( ); let message = Message::AccountMsg(geyser_account); - quic_connection_manager.dispach(message, retry_count).await; + quic_connection_manager.dispatch(message, retry_count).await; }); } diff --git a/common/src/quic/quinn_reciever.rs b/common/src/quic/quinn_reciever.rs index 8c2badd..8bb8738 100644 --- a/common/src/quic/quinn_reciever.rs +++ b/common/src/quic/quinn_reciever.rs @@ -27,7 +27,7 @@ pub async fn recv_message( let size_bytes: [u8; 8] = size_bytes.try_into().unwrap(); let size = u64::from_le_bytes(size_bytes) as usize; let mut buffer: Vec = vec![0; size]; - while let Some(data) = recv_stream.read_chunk(size, false).await? { + while let Some(data) = tokio::time::timeout(Duration::from_secs(1), recv_stream.read_chunk(size, false)).await?? { let bytes = data.bytes.to_vec(); let offset = data.offset - 8; let begin_offset = offset as usize; diff --git a/common/src/types/account.rs b/common/src/types/account.rs index f1b0b42..d289b56 100644 --- a/common/src/types/account.rs +++ b/common/src/types/account.rs @@ -6,6 +6,7 @@ use crate::compression::CompressionType; use super::slot_identifier::SlotIdentifier; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)] +#[repr(C)] pub struct Account { pub slot_identifier: SlotIdentifier, pub pubkey: Pubkey, diff --git a/common/src/types/block_meta.rs b/common/src/types/block_meta.rs index 739ab25..fcefd0b 100644 --- a/common/src/types/block_meta.rs +++ b/common/src/types/block_meta.rs @@ -3,6 +3,7 @@ use solana_sdk::commitment_config::CommitmentLevel; use solana_transaction_status::Reward; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct SlotMeta { pub slot: u64, pub parent: u64, @@ -10,6 +11,7 @@ pub struct SlotMeta { } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct BlockMeta { pub parent_slot: u64, pub slot: u64, diff --git a/common/src/types/connections_parameters.rs b/common/src/types/connections_parameters.rs index 527241f..d14f04a 100644 --- a/common/src/types/connections_parameters.rs +++ b/common/src/types/connections_parameters.rs @@ -5,6 +5,7 @@ use crate::quic::configure_client::{ }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct ConnectionParameters { pub max_number_of_streams: u32, pub streams_for_slot_data: u32, diff --git a/common/src/types/slot_identifier.rs b/common/src/types/slot_identifier.rs index f98950d..4ab3e46 100644 --- a/common/src/types/slot_identifier.rs +++ b/common/src/types/slot_identifier.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[repr(C)] pub struct SlotIdentifier { pub slot: u64, } diff --git a/common/src/types/transaction.rs b/common/src/types/transaction.rs index 2d9b167..6bdef73 100644 --- a/common/src/types/transaction.rs +++ b/common/src/types/transaction.rs @@ -10,6 +10,7 @@ use solana_transaction_status::{InnerInstructions, Rewards}; use super::slot_identifier::SlotIdentifier; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct TransactionMeta { pub error: Option, pub fee: u64, @@ -24,6 +25,7 @@ pub struct TransactionMeta { } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[repr(C)] pub struct Transaction { pub slot_identifier: SlotIdentifier, pub signatures: Vec,