Swapping notify channel with broadcast channel

This commit is contained in:
godmodegalactus 2024-04-02 15:07:01 +02:00
parent ca3fa46139
commit 12a6832c56
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
4 changed files with 36 additions and 35 deletions

View File

@ -36,7 +36,7 @@ use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::{broadcast, Notify};
use tracing::trace_span;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@ -278,7 +278,7 @@ pub fn create_block_processing_task(
mut exit_notify: broadcast::Receiver<()>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
'main_loop: loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
@ -293,7 +293,7 @@ pub fn create_block_processing_task(
// connect to grpc
let mut client =
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
let mut stream = tokio::select! {
let mut stream = tokio::select! {
res = client
.subscribe_once(
HashMap::new(),
@ -354,6 +354,7 @@ pub fn create_block_processing_task(
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
})
}

View File

@ -14,7 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{broadcast, OwnedSemaphorePermit, RwLock, Semaphore};
pub type EndpointPool = RotatingQueue<Endpoint>;
@ -40,7 +40,6 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
@ -51,7 +50,6 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
@ -60,13 +58,16 @@ impl QuicConnection {
identity,
socket_address,
connection_params,
exit_notify,
timeout_counters: Arc::new(AtomicU64::new(0)),
has_connected_once: Arc::new(AtomicBool::new(false)),
}
}
async fn connect(&self, is_already_connected: bool) -> Option<Connection> {
async fn connect(
&self,
is_already_connected: bool,
exit_notify: broadcast::Receiver<()>,
) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
is_already_connected,
@ -74,12 +75,12 @@ impl QuicConnection {
self.socket_address,
self.connection_params.connection_timeout,
self.connection_params.connection_retry_count,
self.exit_notify.clone(),
exit_notify,
)
.await
}
pub async fn get_connection(&self) -> Option<Connection> {
pub async fn get_connection(&self, exit_notify: broadcast::Receiver<()>) -> Option<Connection> {
// get new connection reset if necessary
let last_stable_id = self.last_stable_id.load(Ordering::Relaxed) as usize;
let conn = self.connection.read().await.clone();
@ -95,7 +96,7 @@ impl QuicConnection {
Some(connection)
} else {
NB_QUIC_CONNECTION_RESET.inc();
let new_conn = self.connect(true).await;
let new_conn = self.connect(true, exit_notify).await;
if let Some(new_conn) = new_conn {
*conn = Some(new_conn);
conn.clone()
@ -116,7 +117,7 @@ impl QuicConnection {
// connection has recently been established/ just use it
return (*lk).clone();
}
let connection = self.connect(false).await;
let connection = self.connect(false, exit_notify).await;
*lk = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed);
connection
@ -124,17 +125,16 @@ impl QuicConnection {
}
}
pub async fn send_transaction(&self, tx: &Vec<u8>) {
pub async fn send_transaction(&self, tx: &Vec<u8>, mut exit_notify: broadcast::Receiver<()>) {
let connection_retry_count = self.connection_params.connection_retry_count;
for _ in 0..connection_retry_count {
let mut do_retry = false;
let exit_notify = self.exit_notify.clone();
let connection = tokio::select! {
conn = self.get_connection() => {
conn = self.get_connection(exit_notify.resubscribe()) => {
conn
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -149,7 +149,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -164,7 +164,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -247,7 +247,6 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
@ -259,7 +258,6 @@ impl QuicConnectionPool {
endpoints.get().expect("Should get and endpoint"),
socket_address,
connection_parameters,
exit_notify.clone(),
));
}
Self {

View File

@ -14,7 +14,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{sync::Notify, time::timeout};
use tokio::{sync::broadcast, time::timeout};
lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
@ -221,7 +221,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: Arc<Notify>,
mut exit_notified: broadcast::Receiver<()>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
@ -230,7 +230,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.recv() => {
break;
}
}
@ -240,7 +240,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.recv() => {
break;
}
}

View File

@ -15,7 +15,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{Receiver, Sender},
broadcast::{self, Receiver, Sender},
Notify,
};
@ -48,7 +48,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
exit_notifier: broadcast::Sender<()>,
}
impl ActiveConnection {
@ -59,13 +59,14 @@ impl ActiveConnection {
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
) -> Self {
let (exit_notifier, _) = broadcast::channel(1);
Self {
endpoints,
tpu_address,
identity,
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
exit_notifier,
}
}
@ -79,7 +80,6 @@ impl ActiveConnection {
let fill_notify = Arc::new(Notify::new());
let identity = self.identity;
let exit_notifier = self.exit_notifier.clone();
NB_QUIC_ACTIVE_CONNECTIONS.inc();
@ -95,7 +95,6 @@ impl ActiveConnection {
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_notifier.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
@ -109,7 +108,7 @@ impl ActiveConnection {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_notifier = exit_notifier.clone();
let mut exit_notifier = self.exit_notifier.subscribe();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
@ -118,7 +117,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
_ = exit_notifier.recv() => {
break;
}
};
@ -165,12 +164,14 @@ impl ActiveConnection {
if let Ok(PooledConnection { connection, permit }) =
connection_pool.get_pooled_connection().await
{
let exit_notifier = self.exit_notifier.subscribe();
tokio::task::spawn(async move {
let _permit = permit;
connection.get_connection().await;
connection.get_connection(exit_notifier).await;
});
};
let mut exit_notifier = self.exit_notifier.subscribe();
'main_loop: loop {
tokio::select! {
_ = fill_notify.notified() => {
@ -197,6 +198,7 @@ impl ActiveConnection {
break;
},
};
let exit_notifier = self.exit_notifier.subscribe();
tokio::spawn(async move {
// permit will be used to send all the transaction and then destroyed
@ -205,13 +207,13 @@ impl ActiveConnection {
NB_QUIC_TASKS.inc();
connection.send_transaction(tx.transaction.as_ref()).await;
connection.send_transaction(tx.transaction.as_ref(), exit_notifier).await;
timer.observe_duration();
NB_QUIC_TASKS.dec();
});
}
},
_ = exit_notifier.notified() => {
_ = exit_notifier.recv() => {
break 'main_loop;
}
}
@ -289,7 +291,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.notify_waiters();
let _ = value.exit_notifier.send(());
false
} else {
true