Replacing notify by cancellation token to correctly shutdown tasks

This commit is contained in:
godmodegalactus 2024-03-27 21:01:19 +01:00
parent 9926d2e5c1
commit d1abc345cf
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
9 changed files with 42 additions and 32 deletions

5
Cargo.lock generated
View File

@ -1779,7 +1779,7 @@ dependencies = [
[[package]]
name = "geyser-grpc-connector"
version = "0.10.1+yellowstone.1.12"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4#ce6ca26028c4466e0236657a76b9db2cccf4d535"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize5#85258149711ec75a58be9e7bd2c3f732bd3b074f"
dependencies = [
"anyhow",
"async-stream",
@ -1792,6 +1792,7 @@ dependencies = [
"merge-streams",
"solana-sdk",
"tokio",
"tokio-util",
"tonic-health",
"tracing",
"url",
@ -4632,6 +4633,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tonic-health",
"tracing",
"yellowstone-grpc-client",
@ -4808,6 +4810,7 @@ dependencies = [
"solana-version",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
]

View File

@ -70,6 +70,7 @@ dotenv = "0.15.0"
async-channel = "1.8.0"
merge-streams = "0.1.2"
jemallocator = "0.5"
tokio-util = "0.7.10"
quinn = "0.10.2"
quinn-proto = "0.10.5"

View File

@ -9,7 +9,7 @@ license = "AGPL"
[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize5", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }
@ -48,3 +48,4 @@ itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }
tonic-health = { workspace = true }
tokio-util = { workspace = true }

View File

@ -11,11 +11,10 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use tokio_util::sync::CancellationToken;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Instant};
use tracing::debug_span;
@ -31,7 +30,7 @@ use crate::grpc_subscription::from_grpc_block_update;
fn create_grpc_multiplex_processed_block_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_sender: tokio::sync::mpsc::Sender<ProducedBlock>,
exit_notify: Arc<Notify>,
exit_notfier: CancellationToken,
) -> Vec<JoinHandle<()>> {
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();
@ -43,7 +42,7 @@ fn create_grpc_multiplex_processed_block_task(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notfier.clone(),
);
tasks.push(task);
}
@ -66,7 +65,7 @@ fn create_grpc_multiplex_processed_block_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
_ = exit_notfier.cancelled() => {
break;
}
};
@ -130,7 +129,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
commitment_config: CommitmentConfig,
exit_notify: Arc<Notify>,
exit_notifier: CancellationToken,
) -> Vec<JoinHandle<()>> {
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
let mut tasks = vec![];
@ -139,7 +138,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notifier.clone(),
);
tasks.push(task);
}
@ -151,7 +150,7 @@ fn create_grpc_multiplex_block_info_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
_ = exit_notifier.cancelled() => {
break;
}
};
@ -263,7 +262,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let exit_notify = Arc::new(Notify::new());
let exit_notify = CancellationToken::new();
let processed_block_sender = processed_block_sender.clone();
reconnect_attempts += 1;
@ -442,7 +442,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
exit_notify.cancel();
futures::future::join_all(task_list).await;
} // -- END reconnect loop
});
@ -474,7 +474,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
let jh_multiplex_task = tokio::spawn(async move {
loop {
let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10);
let exit_notify = Arc::new(Notify::new());
let exit_notify = CancellationToken::new();
let tasks = grpc_sources
.clone()
@ -537,7 +537,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
exit_notify.cancel();
futures::future::join_all(tasks).await;
} // -- END reconnect loop
});

View File

@ -33,6 +33,7 @@ use solana_sdk::{
transaction::TransactionError,
};
use solana_transaction_status::{Reward, RewardType};
use tokio_util::sync::CancellationToken;
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
@ -275,7 +276,7 @@ pub fn create_block_processing_task(
grpc_x_token: Option<String>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: Arc<Notify>,
exit_notfier: CancellationToken,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
@ -338,7 +339,7 @@ pub fn create_block_processing_task(
}
};
},
_ = exit_notfier.notified() => {
_ = exit_notfier.cancelled() => {
break;
}
}

View File

@ -40,6 +40,7 @@ chrono = { workspace = true }
rustls = { workspace = true }
solana-lite-rpc-core = { workspace = true }
solana-lite-rpc-util = { workspace = true }
tokio-util = { workspace = true }
[dev-dependencies]
tracing = { workspace = true }

View File

@ -14,7 +14,8 @@ use std::{
Arc,
},
};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
use tokio_util::sync::CancellationToken;
pub type EndpointPool = RotatingQueue<Endpoint>;
@ -40,7 +41,7 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
exit_notify: CancellationToken,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
@ -51,7 +52,7 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
exit_notify: CancellationToken,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
@ -134,7 +135,7 @@ impl QuicConnection {
conn = self.get_connection() => {
conn
},
_ = exit_notify.notified() => {
_ = exit_notify.cancelled() => {
break;
}
};
@ -149,7 +150,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.cancelled() => {
break;
}
};
@ -164,7 +165,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.cancelled() => {
break;
}
};
@ -247,7 +248,7 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: Arc<Notify>,
exit_notify: CancellationToken,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {

View File

@ -14,7 +14,8 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{sync::Notify, time::timeout};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
@ -219,7 +220,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: Arc<Notify>,
exit_notified: CancellationToken,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
@ -228,7 +229,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.cancelled() => {
break;
}
}
@ -238,7 +239,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.cancelled() => {
break;
}
}

View File

@ -18,6 +18,7 @@ use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
};
use tokio_util::sync::CancellationToken;
use crate::{
quic_connection::{PooledConnection, QuicConnectionPool},
@ -48,7 +49,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
exit_notifier: CancellationToken,
}
impl ActiveConnection {
@ -65,7 +66,7 @@ impl ActiveConnection {
identity,
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
exit_notifier: CancellationToken::new(),
}
}
@ -115,7 +116,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
_ = exit_notifier.cancelled() => {
break;
}
};
@ -208,7 +209,7 @@ impl ActiveConnection {
});
}
},
_ = exit_notifier.notified() => {
_ = exit_notifier.cancelled() => {
break 'main_loop;
}
}
@ -286,7 +287,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.notify_waiters();
value.exit_notifier.cancel();
false
} else {
true