Changing exit signal from notify to broadcast channel in grpc multiplexer

This commit is contained in:
godmodegalactus 2024-04-02 14:45:41 +02:00
parent 681334197f
commit 91cf06436a
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 30 additions and 23 deletions

2
Cargo.lock generated
View File

@ -1793,7 +1793,7 @@ dependencies = [
[[package]]
name = "geyser-grpc-connector"
version = "0.10.1+yellowstone.1.12"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4#ce6ca26028c4466e0236657a76b9db2cccf4d535"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit#688e4d241dd18d18f57345d592e803aa673fcd96"
dependencies = [
"anyhow",
"async-stream",

View File

@ -9,7 +9,7 @@ license = "AGPL"
[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize-with-broadcast-exit", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }

View File

@ -12,10 +12,8 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::Notify;
use tokio::sync::broadcast::{self, Receiver};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Instant};
use tracing::debug_span;
@ -31,7 +29,7 @@ use crate::grpc_subscription::from_grpc_block_update;
fn create_grpc_multiplex_processed_block_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_sender: tokio::sync::mpsc::Sender<ProducedBlock>,
exit_notify: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> Vec<JoinHandle<()>> {
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();
@ -43,7 +41,7 @@ fn create_grpc_multiplex_processed_block_task(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
tasks.push(task);
}
@ -66,7 +64,7 @@ fn create_grpc_multiplex_processed_block_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -130,7 +128,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
commitment_config: CommitmentConfig,
exit_notify: Arc<Notify>,
mut exit_notify: broadcast::Receiver<()>,
) -> Vec<JoinHandle<()>> {
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
let mut tasks = vec![];
@ -139,7 +137,7 @@ fn create_grpc_multiplex_block_info_task(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
tasks.push(task);
}
@ -151,7 +149,7 @@ fn create_grpc_multiplex_block_info_task(
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.recv() => {
break;
}
};
@ -263,7 +261,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let exit_notify = Arc::new(Notify::new());
let (exit_sender, exit_notify) = broadcast::channel(1);
let processed_block_sender = processed_block_sender.clone();
reconnect_attempts += 1;
@ -280,7 +278,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let processed_blocks_tasks = create_grpc_multiplex_processed_block_task(
&grpc_sources,
processed_block_sender.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
task_list.extend(processed_blocks_tasks);
@ -290,21 +288,21 @@ pub fn create_grpc_multiplex_blocks_subscription(
&grpc_sources,
block_info_sender_processed.clone(),
CommitmentConfig::processed(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
task_list.extend(jh_meta_task_processed);
let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_confirmed.clone(),
CommitmentConfig::confirmed(),
exit_notify.clone(),
exit_notify.resubscribe(),
);
task_list.extend(jh_meta_task_confirmed);
let jh_meta_task_finalized = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_finalized.clone(),
CommitmentConfig::finalized(),
exit_notify.clone(),
exit_notify,
);
task_list.extend(jh_meta_task_finalized);
@ -442,8 +440,12 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
futures::future::join_all(task_list).await;
if exit_sender.send(()).is_ok() {
futures::future::join_all(task_list).await;
} else {
log::error!("Problem sending exit signal");
task_list.iter().for_each(|x| x.abort());
}
} // -- END reconnect loop
});
@ -474,9 +476,9 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
let jh_multiplex_task = tokio::spawn(async move {
loop {
let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10);
let exit_notify = Arc::new(Notify::new());
let (exit_sender, exit_notify) = broadcast::channel(1);
let tasks = grpc_sources
let task_list = grpc_sources
.clone()
.iter()
.map(|grpc_source| {
@ -484,7 +486,7 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).slots(),
autoconnect_tx.clone(),
exit_notify.clone(),
exit_notify.resubscribe(),
)
})
.collect_vec();
@ -537,8 +539,13 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
futures::future::join_all(tasks).await;
if exit_sender.send(()).is_ok() {
futures::future::join_all(task_list).await;
} else {
log::error!("Problem sending exit signal");
task_list.iter().for_each(|x| x.abort());
}
} // -- END reconnect loop
});