Porting back the exit notify change

This commit is contained in:
godmodegalactus 2024-03-27 17:57:40 +01:00
parent 79748c5a58
commit dfe8d972eb
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
3 changed files with 69 additions and 25 deletions

2
Cargo.lock generated
View File

@ -1779,7 +1779,7 @@ dependencies = [
[[package]]
name = "geyser-grpc-connector"
version = "0.10.1+yellowstone.1.12"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize3#ae56e0f5f894933bea046e8f220f74df3eab5355"
source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4#ce6ca26028c4466e0236657a76b9db2cccf4d535"
dependencies = [
"anyhow",
"async-stream",

View File

@ -9,7 +9,7 @@ license = "AGPL"
[dependencies]
#geyser-grpc-connector = { path = "../../geyser-grpc-connector" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize3", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize4", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" }
solana-sdk = { workspace = true }
solana-rpc-client-api = { workspace = true }

View File

@ -1,6 +1,7 @@
use anyhow::{bail, Context};
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::{GeyserFilter, GrpcSourceConfig, Message};
use itertools::Itertools;
use log::{debug, info, trace, warn};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
@ -11,9 +12,11 @@ use solana_sdk::commitment_config::CommitmentConfig;
use solana_lite_rpc_core::solana_utils::hash_from_str;
use solana_lite_rpc_core::structures::block_info::BlockInfo;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::task::AbortHandle;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Instant};
use tracing::debug_span;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
@ -28,16 +31,21 @@ use crate::grpc_subscription::from_grpc_block_update;
fn create_grpc_multiplex_processed_block_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_sender: tokio::sync::mpsc::Sender<ProducedBlock>,
) -> Vec<AbortHandle> {
exit_notify: Arc<Notify>,
) -> Vec<JoinHandle<()>> {
const COMMITMENT_CONFIG: CommitmentConfig = CommitmentConfig::processed();
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
let mut tasks = vec![];
for grpc_source in grpc_sources {
create_geyser_autoconnection_task_with_mpsc(
let task = create_geyser_autoconnection_task_with_mpsc(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).blocks_and_txs(),
autoconnect_tx.clone(),
exit_notify.clone(),
);
tasks.push(task);
}
let jh_merging_streams = tokio::task::spawn(async move {
@ -54,7 +62,15 @@ fn create_grpc_multiplex_processed_block_task(
last_tick = Instant::now();
const MAX_SIZE: usize = 1024;
match blocks_rx.recv().await {
let blocks_rx_result = tokio::select! {
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
break;
}
};
match blocks_rx_result {
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
let mapfilter =
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
@ -73,7 +89,7 @@ fn create_grpc_multiplex_processed_block_task(
.context("Send block to channel");
if send_result.is_err() {
warn!("Block channel receiver is closed - aborting");
return;
break;
}
trace!(
@ -100,12 +116,13 @@ fn create_grpc_multiplex_processed_block_task(
}
None => {
warn!("Multiplexed geyser source stream block terminated - aborting task");
return;
break;
}
}
} // -- END receiver loop
});
vec![jh_merging_streams.abort_handle()]
tasks.push(jh_merging_streams);
tasks
}
// backpressure: the mpsc sender will block grpc stream until capacity is available
@ -113,20 +130,32 @@ fn create_grpc_multiplex_block_info_task(
grpc_sources: &Vec<GrpcSourceConfig>,
block_info_sender: tokio::sync::mpsc::Sender<BlockInfo>,
commitment_config: CommitmentConfig,
) -> Vec<AbortHandle> {
exit_notify: Arc<Notify>,
) -> Vec<JoinHandle<()>> {
let (autoconnect_tx, mut blocks_rx) = tokio::sync::mpsc::channel(10);
let mut tasks = vec![];
for grpc_source in grpc_sources {
create_geyser_autoconnection_task_with_mpsc(
let task = create_geyser_autoconnection_task_with_mpsc(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_meta(),
autoconnect_tx.clone(),
exit_notify.clone(),
);
tasks.push(task);
}
let jh_merging_streams = tokio::task::spawn(async move {
let mut tip: Slot = 0;
loop {
match blocks_rx.recv().await {
let blocks_rx_result = tokio::select! {
res = blocks_rx.recv() => {
res
},
_ = exit_notify.notified() => {
break;
}
};
match blocks_rx_result {
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
if let Some(update) = subscribe_update.update_oneof {
match update {
@ -157,7 +186,7 @@ fn create_grpc_multiplex_block_info_task(
.context("Send block to channel");
if send_result.is_err() {
warn!("Block channel receiver is closed - aborting");
return;
break;
}
trace!(
@ -184,13 +213,13 @@ fn create_grpc_multiplex_block_info_task(
}
None => {
warn!("Multiplexed geyser source stream block meta terminated - aborting task");
return;
break;
}
}
} // -- END receiver loop
});
vec![jh_merging_streams.abort_handle()]
tasks.push(jh_merging_streams);
tasks
}
/// connect to multiple grpc sources to consume processed blocks and block status update
@ -234,6 +263,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
tokio::sync::mpsc::channel::<BlockInfo>(500);
let (block_info_sender_finalized, mut block_info_reciever_finalized) =
tokio::sync::mpsc::channel::<BlockInfo>(500);
let exit_notify = Arc::new(Notify::new());
let processed_block_sender = processed_block_sender.clone();
reconnect_attempts += 1;
@ -245,11 +275,12 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
// tasks which should be cleaned up uppon reconnect
let mut task_list: Vec<AbortHandle> = vec![];
let mut task_list: Vec<JoinHandle<()>> = vec![];
let processed_blocks_tasks = create_grpc_multiplex_processed_block_task(
&grpc_sources,
processed_block_sender.clone(),
exit_notify.clone(),
);
task_list.extend(processed_blocks_tasks);
@ -259,18 +290,21 @@ pub fn create_grpc_multiplex_blocks_subscription(
&grpc_sources,
block_info_sender_processed.clone(),
CommitmentConfig::processed(),
exit_notify.clone(),
);
task_list.extend(jh_meta_task_processed);
let jh_meta_task_confirmed = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_confirmed.clone(),
CommitmentConfig::confirmed(),
exit_notify.clone(),
);
task_list.extend(jh_meta_task_confirmed);
let jh_meta_task_finalized = create_grpc_multiplex_block_info_task(
&grpc_sources,
block_info_sender_finalized.clone(),
CommitmentConfig::finalized(),
exit_notify.clone(),
);
task_list.extend(jh_meta_task_finalized);
@ -408,7 +442,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
} // -- END receiver loop
task_list.iter().for_each(|task| task.abort());
exit_notify.notify_waiters();
futures::future::join_all(task_list).await;
} // -- END reconnect loop
});
@ -439,13 +474,20 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
let jh_multiplex_task = tokio::spawn(async move {
loop {
let (autoconnect_tx, mut slots_rx) = tokio::sync::mpsc::channel(10);
for grpc_source in &grpc_sources {
create_geyser_autoconnection_task_with_mpsc(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).slots(),
autoconnect_tx.clone(),
);
}
let exit_notify = Arc::new(Notify::new());
let tasks = grpc_sources
.clone()
.iter()
.map(|grpc_source| {
create_geyser_autoconnection_task_with_mpsc(
grpc_source.clone(),
GeyserFilter(COMMITMENT_CONFIG).slots(),
autoconnect_tx.clone(),
exit_notify.clone(),
)
})
.collect_vec();
'recv_loop: loop {
let next = tokio::time::timeout(Duration::from_secs(30), slots_rx.recv()).await;
@ -495,6 +537,8 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
}
}
} // -- END receiver loop
exit_notify.notify_waiters();
futures::future::join_all(tasks).await;
} // -- END reconnect loop
});