Fixing connection problems after creating too many streams
This commit is contained in:
parent
47048e31d0
commit
079bd87552
|
@ -130,11 +130,11 @@ mod tests {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (connection_manager, _jh) = ConnectionManager::new(endpoint);
|
||||
let (connection_manager, _jh) = ConnectionManager::new(endpoint, 10);
|
||||
notify_server_start.notify_one();
|
||||
notify_subscription.notified().await;
|
||||
for msg in msgs {
|
||||
connection_manager.dispatch(msg, 10, false).await;
|
||||
connection_manager.dispatch(msg, 10).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -8,9 +8,9 @@ use crate::quic::{
|
|||
configure_server::ALPN_GEYSER_PROTOCOL_ID, skip_verification::ClientSkipServerVerification,
|
||||
};
|
||||
|
||||
pub const DEFAULT_MAX_STREAMS: u32 = 32768;
|
||||
pub const DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS: u32 = 128;
|
||||
pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 8192;
|
||||
pub const DEFAULT_MAX_STREAMS: u32 = 4096;
|
||||
pub const DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS: u32 = 4;
|
||||
pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 32;
|
||||
pub const DEFAULT_MAX_ACCOUNT_STREAMS: u32 =
|
||||
DEFAULT_MAX_STREAMS - DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS - DEFAULT_MAX_TRANSACTION_STREAMS;
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use quinn::{Connection, Endpoint, VarInt};
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::VecDeque, time::Duration};
|
||||
use tokio::sync::Semaphore;
|
||||
|
@ -18,6 +19,8 @@ pub struct ConnectionData {
|
|||
stream_semaphore_for_accounts: Arc<Semaphore>,
|
||||
stream_semaphore_for_slot_data: Arc<Semaphore>,
|
||||
stream_semaphore_for_transactions: Arc<Semaphore>,
|
||||
lagging_count: Arc<AtomicU64>,
|
||||
max_lagging_stream: u64,
|
||||
}
|
||||
|
||||
impl ConnectionData {
|
||||
|
@ -25,6 +28,7 @@ impl ConnectionData {
|
|||
id: u64,
|
||||
connection: Connection,
|
||||
connections_parameters: ConnectionParameters,
|
||||
max_lagging_stream: u64,
|
||||
) -> Self {
|
||||
let accounts_streams_count = connections_parameters
|
||||
.max_number_of_streams
|
||||
|
@ -44,6 +48,8 @@ impl ConnectionData {
|
|||
stream_semaphore_for_transactions: Arc::new(Semaphore::new(
|
||||
connections_parameters.streams_for_transactions as usize,
|
||||
)),
|
||||
lagging_count: Arc::new(AtomicU64::new(0)),
|
||||
max_lagging_stream,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -57,7 +63,7 @@ pub struct ConnectionManager {
|
|||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
pub fn new(endpoint: Endpoint) -> (Self, JoinHandle<()>) {
|
||||
pub fn new(endpoint: Endpoint, max_lagging_stream: u64) -> (Self, JoinHandle<()>) {
|
||||
let connections: Arc<RwLock<VecDeque<ConnectionData>>> =
|
||||
Arc::new(RwLock::new(VecDeque::new()));
|
||||
// create a task to add incoming connections
|
||||
|
@ -93,6 +99,7 @@ impl ConnectionManager {
|
|||
connection,
|
||||
current_id,
|
||||
connections_parameters,
|
||||
max_lagging_stream,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
@ -125,6 +132,7 @@ impl ConnectionManager {
|
|||
connection: Connection,
|
||||
current_id: u64,
|
||||
connections_parameters: ConnectionParameters,
|
||||
max_lagging_stream: u64,
|
||||
) {
|
||||
// connection established
|
||||
// add the connection in the connections list
|
||||
|
@ -134,6 +142,7 @@ impl ConnectionManager {
|
|||
current_id,
|
||||
connection.clone(),
|
||||
connections_parameters,
|
||||
max_lagging_stream,
|
||||
));
|
||||
drop(lk);
|
||||
|
||||
|
@ -187,7 +196,7 @@ impl ConnectionManager {
|
|||
});
|
||||
}
|
||||
|
||||
pub async fn dispatch(&self, message: Message, retry_count: u64, drop_lagger: bool) {
|
||||
pub async fn dispatch(&self, message: Message, retry_count: u64) {
|
||||
let lk = self.connections.read().await;
|
||||
|
||||
for connection_data in lk.iter() {
|
||||
|
@ -214,6 +223,8 @@ impl ConnectionManager {
|
|||
),
|
||||
};
|
||||
let id = connection_data.id;
|
||||
let lagging_count = connection_data.lagging_count.clone();
|
||||
let max_lagging_stream = connection_data.max_lagging_stream;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let permit_result = semaphore.clone().try_acquire_owned();
|
||||
|
@ -227,11 +238,15 @@ impl ConnectionManager {
|
|||
id,
|
||||
message_type
|
||||
);
|
||||
if drop_lagger {
|
||||
let lc =
|
||||
lagging_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if lc > max_lagging_stream {
|
||||
connection.close(VarInt::from_u32(0), b"laggy client");
|
||||
return;
|
||||
}
|
||||
semaphore.acquire_owned().await.expect("Permit is aquired")
|
||||
let p = semaphore.acquire_owned().await.expect("Permit is aquired");
|
||||
lagging_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
p
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ impl QuicServer {
|
|||
pub fn new(
|
||||
runtime: Runtime,
|
||||
config: ConfigQuicPlugin,
|
||||
drop_laggers: bool,
|
||||
max_lagging: u64,
|
||||
) -> anyhow::Result<Self> {
|
||||
let server_config = configure_server(
|
||||
config.quic_parameters.max_number_of_streams_per_client,
|
||||
|
@ -65,7 +65,7 @@ impl QuicServer {
|
|||
.unwrap();
|
||||
let retry_count = config.number_of_retries;
|
||||
|
||||
let (quic_connection_manager, _jh) = ConnectionManager::new(endpoint);
|
||||
let (quic_connection_manager, _jh) = ConnectionManager::new(endpoint, max_lagging);
|
||||
log::info!("Connection manager sucessfully started");
|
||||
while let Some(channel_message) = data_channel_tx.recv().await {
|
||||
match channel_message {
|
||||
|
@ -78,7 +78,6 @@ impl QuicServer {
|
|||
slot,
|
||||
compression_type,
|
||||
retry_count,
|
||||
drop_laggers,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -88,21 +87,15 @@ impl QuicServer {
|
|||
parent,
|
||||
commitment_level,
|
||||
});
|
||||
quic_connection_manager
|
||||
.dispatch(message, retry_count, drop_laggers)
|
||||
.await;
|
||||
quic_connection_manager.dispatch(message, retry_count).await;
|
||||
}
|
||||
ChannelMessage::BlockMeta(block_meta) => {
|
||||
let message = Message::BlockMetaMsg(block_meta);
|
||||
quic_connection_manager
|
||||
.dispatch(message, retry_count, drop_laggers)
|
||||
.await;
|
||||
quic_connection_manager.dispatch(message, retry_count).await;
|
||||
}
|
||||
ChannelMessage::Transaction(transaction) => {
|
||||
let message = Message::TransactionMsg(transaction);
|
||||
quic_connection_manager
|
||||
.dispatch(message, retry_count, drop_laggers)
|
||||
.await;
|
||||
quic_connection_manager.dispatch(message, retry_count).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -129,7 +122,6 @@ fn process_account_message(
|
|||
slot: Slot,
|
||||
compression_type: CompressionType,
|
||||
retry_count: u64,
|
||||
drop_laggers: bool,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
let slot_identifier = SlotIdentifier { slot };
|
||||
|
@ -142,8 +134,6 @@ fn process_account_message(
|
|||
);
|
||||
|
||||
let message = Message::AccountMsg(geyser_account);
|
||||
quic_connection_manager
|
||||
.dispatch(message, retry_count, drop_laggers)
|
||||
.await;
|
||||
quic_connection_manager.dispatch(message, retry_count).await;
|
||||
});
|
||||
}
|
||||
|
|
|
@ -7,5 +7,5 @@ pub struct Args {
|
|||
pub url: String,
|
||||
|
||||
#[clap(short, long)]
|
||||
pub rpc_url: String,
|
||||
pub rpc_url: Option<String>,
|
||||
}
|
||||
|
|
|
@ -40,16 +40,9 @@ pub mod cli;
|
|||
async fn main() {
|
||||
let args = Args::parse();
|
||||
println!("Connecting");
|
||||
let client = Client::new(
|
||||
args.url,
|
||||
ConnectionParameters {
|
||||
max_number_of_streams: 100_000,
|
||||
streams_for_slot_data: 128,
|
||||
streams_for_transactions: 10_000,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let client = Client::new(args.url, ConnectionParameters::default())
|
||||
.await
|
||||
.unwrap();
|
||||
println!("Connected");
|
||||
|
||||
let bytes_transfered = Arc::new(AtomicU64::new(0));
|
||||
|
@ -64,9 +57,9 @@ async fn main() {
|
|||
let slot_slot = Arc::new(AtomicU64::new(0));
|
||||
let blockmeta_slot = Arc::new(AtomicU64::new(0));
|
||||
|
||||
{
|
||||
if let Some(rpc_url) = args.rpc_url {
|
||||
let cluster_slot = cluster_slot.clone();
|
||||
let rpc = RpcClient::new(args.rpc_url);
|
||||
let rpc = RpcClient::new(rpc_url);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
|
|
@ -12,6 +12,6 @@ pub struct Args {
|
|||
#[clap(short = 'l', long, default_value_t = 1_000_000)]
|
||||
pub account_data_size: u32,
|
||||
|
||||
#[clap(short, long, default_value_t = false)]
|
||||
pub drop_laggers: bool,
|
||||
#[clap(short, long, default_value_t = 1_000)]
|
||||
pub max_lagging: u64,
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ pub fn main() -> anyhow::Result<()> {
|
|||
},
|
||||
number_of_retries: 100,
|
||||
};
|
||||
let quic_server = QuicServer::new(runtime, config, args.drop_laggers).unwrap();
|
||||
let quic_server = QuicServer::new(runtime, config, args.max_lagging).unwrap();
|
||||
|
||||
let mut instant = Instant::now();
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
|
|||
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
|
||||
})?;
|
||||
|
||||
let quic_server = QuicServer::new(runtime, config.quic_plugin, true).map_err(|_| {
|
||||
let quic_server = QuicServer::new(runtime, config.quic_plugin, 20_000).map_err(|_| {
|
||||
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
|
||||
})?;
|
||||
self.quic_server = Some(quic_server);
|
||||
|
|
Loading…
Reference in New Issue