From a8863cd74d6956b60888cd6a6e2bf0a27b61b9d9 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 2 Aug 2023 15:22:59 +0200 Subject: [PATCH] code format lite-rpc --- core/src/lib.rs | 2 +- core/src/proxy_request_format.rs | 32 ++-- core/src/quic_connection_utils.rs | 9 +- core/src/solana_utils.rs | 6 +- lite-rpc/src/bridge.rs | 2 +- lite-rpc/src/main.rs | 14 +- services/src/tpu_utils/mod.rs | 6 +- .../quic_proxy_connection_manager.rs | 158 ++++++++---------- .../src/tpu_utils/quinn_auto_reconnect.rs | 19 +-- services/src/tpu_utils/tpu_connection_path.rs | 5 +- services/src/tpu_utils/tpu_service.rs | 64 +++---- services/src/transaction_service.rs | 2 +- ...literpc_tpu_quic_server_integrationtest.rs | 1 - 13 files changed, 157 insertions(+), 163 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 06060784..97da6f71 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -2,6 +2,7 @@ pub mod block_processor; pub mod block_store; pub mod leader_schedule; pub mod notifications; +pub mod proxy_request_format; pub mod quic_connection; pub mod quic_connection_utils; pub mod rotating_queue; @@ -10,6 +11,5 @@ pub mod structures; pub mod subscription_handler; pub mod subscription_sink; pub mod tx_store; -pub mod proxy_request_format; pub type AnyhowJoinHandle = tokio::task::JoinHandle>; diff --git a/core/src/proxy_request_format.rs b/core/src/proxy_request_format.rs index 53ddc8a3..8970becd 100644 --- a/core/src/proxy_request_format.rs +++ b/core/src/proxy_request_format.rs @@ -1,10 +1,10 @@ -use std::fmt; -use std::fmt::Display; -use std::net::{SocketAddr}; use anyhow::Context; use serde::{Deserialize, Serialize}; use solana_sdk::pubkey::Pubkey; use solana_sdk::transaction::VersionedTransaction; +use std::fmt; +use std::fmt::Display; +use std::net::SocketAddr; /// /// lite-rpc to proxy wire format @@ -22,14 +22,22 @@ pub struct TpuForwardingRequest { impl Display for TpuForwardingRequest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "TpuForwardingRequest for tpu target {} with identity {}: payload {} tx", - &self.get_tpu_socket_addr(), &self.get_identity_tpunode(), &self.get_transactions().len()) + write!( + f, + "TpuForwardingRequest for tpu target {} with identity {}: payload {} tx", + &self.get_tpu_socket_addr(), + &self.get_identity_tpunode(), + &self.get_transactions().len() + ) } } impl TpuForwardingRequest { - pub fn new(tpu_socket_addr: SocketAddr, identity_tpunode: Pubkey, - transactions: Vec) -> Self { + pub fn new( + tpu_socket_addr: SocketAddr, + identity_tpunode: Pubkey, + transactions: Vec, + ) -> Self { TpuForwardingRequest { format_version: FORMAT_VERSION1, tpu_socket_addr, @@ -38,13 +46,12 @@ impl TpuForwardingRequest { } } - pub fn serialize_wire_format( - &self) -> Vec { + pub fn serialize_wire_format(&self) -> Vec { bincode::serialize(&self).expect("Expect to serialize transactions") } - pub fn deserialize_from_raw_request(raw_proxy_request: &Vec) -> TpuForwardingRequest { - let request = bincode::deserialize::(&raw_proxy_request) + pub fn deserialize_from_raw_request(raw_proxy_request: &[u8]) -> TpuForwardingRequest { + let request = bincode::deserialize::(raw_proxy_request) .context("deserialize proxy request") .unwrap(); @@ -65,6 +72,3 @@ impl TpuForwardingRequest { self.transactions.clone() } } - - - diff --git a/core/src/quic_connection_utils.rs b/core/src/quic_connection_utils.rs index 4c79c48f..c4907e4c 100644 --- a/core/src/quic_connection_utils.rs +++ b/core/src/quic_connection_utils.rs @@ -226,7 +226,10 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { // rtt=1.08178ms pub fn connection_stats(connection: &Connection) -> String { // see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats - format!("stable_id {}, rtt={:?}, stats {:?}", - connection.stable_id(), connection.stats().path.rtt, connection.stats().frame_rx) + format!( + "stable_id {}, rtt={:?}, stats {:?}", + connection.stable_id(), + connection.stats().path.rtt, + connection.stats().frame_rx + ) } - diff --git a/core/src/solana_utils.rs b/core/src/solana_utils.rs index 8b9a5b27..d875e90c 100644 --- a/core/src/solana_utils.rs +++ b/core/src/solana_utils.rs @@ -1,7 +1,9 @@ use crate::structures::identity_stakes::IdentityStakes; use anyhow::Context; use log::info; +use serde::Serialize; use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::ConnectionPeerType; use std::{ @@ -12,11 +14,9 @@ use std::{ }, time::Duration, }; -use serde::Serialize; -use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; -use solana_sdk::transaction::{Transaction, uses_durable_nonce, VersionedTransaction}; +use solana_sdk::transaction::{uses_durable_nonce, Transaction, VersionedTransaction}; use tokio::sync::mpsc::UnboundedReceiver; const AVERAGE_SLOT_CHANGE_TIME_IN_MILLIS: u64 = 400; diff --git a/lite-rpc/src/bridge.rs b/lite-rpc/src/bridge.rs index fa3c4ba2..6fc2f51a 100644 --- a/lite-rpc/src/bridge.rs +++ b/lite-rpc/src/bridge.rs @@ -39,11 +39,11 @@ use solana_sdk::{ use solana_transaction_status::TransactionStatus; use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration}; +use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use tokio::{ net::ToSocketAddrs, sync::mpsc::{self, Sender}, }; -use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; lazy_static::lazy_static! { static ref RPC_SEND_TX: IntCounter = diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index fd0d415c..ab1e9182 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -7,11 +7,11 @@ use clap::Parser; use dotenv::dotenv; use lite_rpc::{bridge::LiteBridge, cli::Args}; +use clap::builder::TypedValueParser; +use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use solana_sdk::signature::Keypair; use std::env; use std::sync::Arc; -use clap::builder::TypedValueParser; -use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use crate::rpc_tester::RpcTester; @@ -66,7 +66,7 @@ pub async fn start_lite_rpc(args: Args) -> anyhow::Result<()> { validator_identity, retry_after, maximum_retries_per_tx, - tpu_connection_path + tpu_connection_path, ) .await .context("Error building LiteBridge")? @@ -123,12 +123,14 @@ pub async fn main() -> anyhow::Result<()> { } } -fn configure_tpu_connection_path(experimental_quic_proxy_addr: Option) -> TpuConnectionPath { +fn configure_tpu_connection_path( + experimental_quic_proxy_addr: Option, +) -> TpuConnectionPath { match experimental_quic_proxy_addr { None => TpuConnectionPath::QuicDirectPath, Some(prox_address) => TpuConnectionPath::QuicForwardProxyPath { // e.g. "127.0.0.1:11111" - forward_proxy_address: prox_address.parse().unwrap() + forward_proxy_address: prox_address.parse().unwrap(), }, } -} \ No newline at end of file +} diff --git a/services/src/tpu_utils/mod.rs b/services/src/tpu_utils/mod.rs index 24e94707..c32b8deb 100644 --- a/services/src/tpu_utils/mod.rs +++ b/services/src/tpu_utils/mod.rs @@ -1,8 +1,6 @@ pub mod tpu_service; -pub mod tpu_connection_path; -pub mod tpu_connection_manager; pub mod quic_proxy_connection_manager; pub mod quinn_auto_reconnect; - - +pub mod tpu_connection_manager; +pub mod tpu_connection_path; diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index d86acd22..8bcfa89b 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -1,28 +1,27 @@ - use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use anyhow::bail; use std::time::Duration; -use anyhow::{bail}; -use futures::FutureExt; use itertools::Itertools; use log::{debug, error, info, trace}; -use quinn::{ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt}; -use solana_sdk::packet::PACKET_DATA_SIZE; +use quinn::{ + ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt, +}; use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::Keypair; use solana_sdk::transaction::VersionedTransaction; use tokio::sync::{broadcast::Receiver, broadcast::Sender, RwLock}; use tokio::time::timeout; use solana_lite_rpc_core::proxy_request_format::TpuForwardingRequest; -use solana_lite_rpc_core::quic_connection_utils::{connection_stats, QuicConnectionParameters, SkipServerVerification}; - +use solana_lite_rpc_core::quic_connection_utils::{ + connection_stats, QuicConnectionParameters, SkipServerVerification, +}; use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect; @@ -36,7 +35,7 @@ pub struct QuicProxyConnectionManager { endpoint: Endpoint, simple_thread_started: AtomicBool, proxy_addr: SocketAddr, - current_tpu_nodes: Arc>> + current_tpu_nodes: Arc>>, } const CHUNK_SIZE_PER_STREAM: usize = 20; @@ -48,7 +47,7 @@ impl QuicProxyConnectionManager { proxy_addr: SocketAddr, ) -> Self { info!("Configure Quic proxy connection manager to {}", proxy_addr); - let endpoint = Self::create_proxy_client_endpoint(certificate.clone(), key.clone()); + let endpoint = Self::create_proxy_client_endpoint(certificate, key); Self { endpoint, @@ -65,21 +64,24 @@ impl QuicProxyConnectionManager { connections_to_keep: HashMap, connection_parameters: QuicConnectionParameters, ) { - debug!("reconfigure quic proxy connection (# of tpu nodes: {})", connections_to_keep.len()); + debug!( + "reconfigure quic proxy connection (# of tpu nodes: {})", + connections_to_keep.len() + ); { - let list_of_nodes = connections_to_keep.iter().map(|(identity, tpu_address)| { - TpuNode { - tpu_identity: identity.clone(), - tpu_address: tpu_address.clone(), - } - }).collect_vec(); + let list_of_nodes = connections_to_keep + .iter() + .map(|(identity, tpu_address)| TpuNode { + tpu_identity: *identity, + tpu_address: *tpu_address, + }) + .collect_vec(); let mut lock = self.current_tpu_nodes.write().await; *lock = list_of_nodes; } - if self.simple_thread_started.load(Relaxed) { // already started return; @@ -100,11 +102,12 @@ impl QuicProxyConnectionManager { exit_signal, connection_parameters, )); - } - fn create_proxy_client_endpoint(certificate: rustls::Certificate, key: rustls::PrivateKey) -> Endpoint { - + fn create_proxy_client_endpoint( + certificate: rustls::Certificate, + key: rustls::PrivateKey, + ) -> Endpoint { const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy"; let mut endpoint = { @@ -152,7 +155,6 @@ impl QuicProxyConnectionManager { exit_signal: Arc, connection_parameters: QuicConnectionParameters, ) { - let auto_connection = AutoReconnect::new(endpoint, proxy_addr); loop { @@ -162,56 +164,58 @@ impl QuicProxyConnectionManager { } tokio::select! { - // TODO add timeout - tx = transaction_receiver.recv() => { + // TODO add timeout + tx = transaction_receiver.recv() => { - let first_tx: Vec = match tx { - Ok((_sig, tx)) => { - tx - }, - Err(e) => { - error!( - "Broadcast channel error on recv error {}", e); - continue; - } - }; - - let mut txs = vec![first_tx]; - for _ in 1..connection_parameters.number_of_transactions_per_unistream { - if let Ok((_signature, tx)) = transaction_receiver.try_recv() { - txs.push(tx); - } + let first_tx: Vec = match tx { + Ok((_sig, tx)) => { + tx + }, + Err(e) => { + error!( + "Broadcast channel error on recv error {}", e); + continue; } + }; - let tpu_fanout_nodes = current_tpu_nodes.read().await.clone(); - - trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy", - txs.len(), tpu_fanout_nodes.len()); - - for target_tpu_node in tpu_fanout_nodes { - Self::send_copy_of_txs_to_quicproxy( - &txs, &auto_connection, - proxy_addr, - target_tpu_node.tpu_address, - target_tpu_node.tpu_identity) - .await.unwrap(); + let mut txs = vec![first_tx]; + for _ in 1..connection_parameters.number_of_transactions_per_unistream { + if let Ok((_signature, tx)) = transaction_receiver.try_recv() { + txs.push(tx); } + } - }, - }; + let tpu_fanout_nodes = current_tpu_nodes.read().await.clone(); + + trace!("Sending copy of transaction batch of {} txs to {} tpu nodes via quic proxy", + txs.len(), tpu_fanout_nodes.len()); + + for target_tpu_node in tpu_fanout_nodes { + Self::send_copy_of_txs_to_quicproxy( + &txs, &auto_connection, + proxy_addr, + target_tpu_node.tpu_address, + target_tpu_node.tpu_identity) + .await.unwrap(); + } + + }, + }; } } - async fn send_copy_of_txs_to_quicproxy(raw_tx_batch: &Vec>, auto_connection: &AutoReconnect, - _proxy_address: SocketAddr, tpu_target_address: SocketAddr, - target_tpu_identity: Pubkey) -> anyhow::Result<()> { - - let raw_tx_batch_copy = raw_tx_batch.clone(); + async fn send_copy_of_txs_to_quicproxy( + raw_tx_batch: &[Vec], + auto_connection: &AutoReconnect, + _proxy_address: SocketAddr, + tpu_target_address: SocketAddr, + target_tpu_identity: Pubkey, + ) -> anyhow::Result<()> { let mut txs = vec![]; - for raw_tx in raw_tx_batch_copy { - let tx = match bincode::deserialize::(&raw_tx) { + for raw_tx in raw_tx_batch { + let tx = match bincode::deserialize::(raw_tx) { Ok(tx) => tx, Err(err) => { bail!(err.to_string()); @@ -220,13 +224,13 @@ impl QuicProxyConnectionManager { txs.push(tx); } - for chunk in txs.chunks(CHUNK_SIZE_PER_STREAM) { - - let forwarding_request = TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into()); + let forwarding_request = + TpuForwardingRequest::new(tpu_target_address, target_tpu_identity, chunk.into()); debug!("forwarding_request: {}", forwarding_request); - let proxy_request_raw = bincode::serialize(&forwarding_request).expect("Expect to serialize transactions"); + let proxy_request_raw = + bincode::serialize(&forwarding_request).expect("Expect to serialize transactions"); let send_result = auto_connection.send_uni(proxy_request_raw).await; @@ -238,29 +242,9 @@ impl QuicProxyConnectionManager { bail!("Failed to send data to quic proxy: {:?}", e); } } - } // -- one chunk - Ok(()) } - async fn send_proxy_request(endpoint: Endpoint, proxy_address: SocketAddr, proxy_request_raw: &Vec) -> anyhow::Result<()> { - info!("sending {} bytes to proxy", proxy_request_raw.len()); - - let connecting = endpoint.connect(proxy_address, "localhost")?; - let connection = timeout(Duration::from_millis(500), connecting).await??; - let mut send = connection.open_uni().await?; - - send.write_all(proxy_request_raw).await?; - - send.finish().await?; - - debug!("connection stats (lite-rpc to proxy): {}", connection_stats(&connection)); - Ok(()) - } - - } - - diff --git a/services/src/tpu_utils/quinn_auto_reconnect.rs b/services/src/tpu_utils/quinn_auto_reconnect.rs index 8fb4bf2b..ee5a3b09 100644 --- a/services/src/tpu_utils/quinn_auto_reconnect.rs +++ b/services/src/tpu_utils/quinn_auto_reconnect.rs @@ -29,17 +29,14 @@ impl AutoReconnect { pub async fn send_uni(&self, payload: Vec) -> anyhow::Result<()> { // TOOD do smart error handling + reconnect - let mut send_stream = timeout( - Duration::from_secs(4), self.refresh() - .await.open_uni()) - .await - .context("open uni stream for sending")??; + let mut send_stream = timeout(Duration::from_secs(4), self.refresh().await.open_uni()) + .await + .context("open uni stream for sending")??; send_stream.write_all(payload.as_slice()).await?; send_stream.finish().await?; Ok(()) } - pub async fn refresh(&self) -> Connection { { let lock = self.current.read().await; @@ -55,7 +52,7 @@ impl AutoReconnect { } let mut lock = self.current.write().await; let maybe_conn = lock.as_ref(); - return match maybe_conn { + match maybe_conn { Some(current) => { if current.close_reason().is_some() { let old_stable_id = current.stable_id(); @@ -66,7 +63,6 @@ impl AutoReconnect { ); let new_connection = self.create_connection().await; - let prev_stable_id = current.stable_id(); *lock = Some(new_connection.clone()); // let old_conn = lock.replace(new_connection.clone()); self.reconnect_count.fetch_add(1, Ordering::SeqCst); @@ -78,7 +74,7 @@ impl AutoReconnect { self.reconnect_count.load(Ordering::SeqCst) ); - new_connection.clone() + new_connection } else { debug!("Reuse connection {} with write-lock", current.stable_id()); current.clone() @@ -92,9 +88,9 @@ impl AutoReconnect { // let old_conn = foo.replace(Some(new_connection.clone())); debug!("Create initial connection {}", new_connection.stable_id()); - new_connection.clone() + new_connection } - }; + } } async fn create_connection(&self) -> Connection { @@ -112,4 +108,3 @@ impl fmt::Display for AutoReconnect { write!(f, "Connection to {}", self.target_address,) } } - diff --git a/services/src/tpu_utils/tpu_connection_path.rs b/services/src/tpu_utils/tpu_connection_path.rs index 048f7c88..a71108d5 100644 --- a/services/src/tpu_utils/tpu_connection_path.rs +++ b/services/src/tpu_utils/tpu_connection_path.rs @@ -1,4 +1,3 @@ - use std::fmt::Display; use std::net::SocketAddr; @@ -12,7 +11,9 @@ impl Display for TpuConnectionPath { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { TpuConnectionPath::QuicDirectPath => write!(f, "Direct QUIC connection to TPU"), - TpuConnectionPath::QuicForwardProxyPath { forward_proxy_address } => { + TpuConnectionPath::QuicForwardProxyPath { + forward_proxy_address, + } => { write!(f, "QUIC Forward Proxy on {}", forward_proxy_address) } } diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index 27d2e882..984c3101 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -10,6 +10,9 @@ use solana_lite_rpc_core::{ }; use super::tpu_connection_manager::TpuConnectionManager; +use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; +use crate::tpu_utils::tpu_connection_path::TpuConnectionPath; +use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy}; use solana_sdk::{ pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair, signer::Signer, slot_history::Slot, }; @@ -26,9 +29,6 @@ use tokio::{ sync::RwLock, time::{Duration, Instant}, }; -use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; -use crate::tpu_utils::tpu_connection_path::TpuConnectionPath; -use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy}; lazy_static::lazy_static! { static ref NB_CLUSTER_NODES: GenericGauge = @@ -72,8 +72,12 @@ pub struct TpuService { #[derive(Clone)] enum ConnectionManager { - DirectTpu { tpu_connection_manager: Arc }, - QuicProxy { quic_proxy_connection_manager: Arc }, + DirectTpu { + tpu_connection_manager: Arc, + }, + QuicProxy { + quic_proxy_connection_manager: Arc, + }, } impl TpuService { @@ -91,25 +95,25 @@ impl TpuService { ) .expect("Failed to initialize QUIC client certificates"); - let connection_manager = - match config.tpu_connection_path { - TpuConnectionPath::QuicDirectPath => { - let tpu_connection_manager = - TpuConnectionManager::new(certificate, key, - config.fanout_slots as usize).await; - DirectTpu { - tpu_connection_manager: Arc::new(tpu_connection_manager), - } + let connection_manager = match config.tpu_connection_path { + TpuConnectionPath::QuicDirectPath => { + let tpu_connection_manager = + TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await; + DirectTpu { + tpu_connection_manager: Arc::new(tpu_connection_manager), } - TpuConnectionPath::QuicForwardProxyPath { forward_proxy_address } => { - let quic_proxy_connection_manager = - QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await; + } + TpuConnectionPath::QuicForwardProxyPath { + forward_proxy_address, + } => { + let quic_proxy_connection_manager = + QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await; - QuicProxy { - quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager), - } + QuicProxy { + quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager), } - }; + } + }; Ok(Self { current_slot: Arc::new(AtomicU64::new(current_slot)), @@ -194,13 +198,17 @@ impl TpuService { self.config.quic_connection_params, ) .await; - }, - QuicProxy { quic_proxy_connection_manager } => { - quic_proxy_connection_manager.update_connection( - self.broadcast_sender.clone(), - connections_to_keep, - self.config.quic_connection_params, - ).await; + } + QuicProxy { + quic_proxy_connection_manager, + } => { + quic_proxy_connection_manager + .update_connection( + self.broadcast_sender.clone(), + connections_to_keep, + self.config.quic_connection_params, + ) + .await; } } } diff --git a/services/src/transaction_service.rs b/services/src/transaction_service.rs index d89bcb45..8cacf59e 100644 --- a/services/src/transaction_service.rs +++ b/services/src/transaction_service.rs @@ -11,12 +11,12 @@ use crate::{ tx_sender::{TransactionInfo, TxSender}, }; use anyhow::bail; +use solana_lite_rpc_core::solana_utils::SerializableTransaction; use solana_lite_rpc_core::{ block_store::{BlockInformation, BlockStore}, notifications::NotificationSender, AnyhowJoinHandle, }; -use solana_lite_rpc_core::solana_utils::SerializableTransaction; use solana_sdk::{commitment_config::CommitmentConfig, transaction::VersionedTransaction}; use tokio::{ sync::mpsc::{self, Sender, UnboundedSender}, diff --git a/services/tests/literpc_tpu_quic_server_integrationtest.rs b/services/tests/literpc_tpu_quic_server_integrationtest.rs index a652e8cc..e10700dc 100644 --- a/services/tests/literpc_tpu_quic_server_integrationtest.rs +++ b/services/tests/literpc_tpu_quic_server_integrationtest.rs @@ -33,7 +33,6 @@ use tokio::task::JoinHandle; use tokio::time::sleep; use tracing_subscriber::util::SubscriberInitExt; - #[derive(Copy, Clone, Debug)] struct TestCaseParams { sample_tx_count: u32,