Merge pull request #265 from blockworks-foundation/reducing_network_usage_by_fetching_only_finalized_blockmeta
Reduce network by only fetching block meta instead of whole block
This commit is contained in:
commit
e39a2f5f02
|
@ -1,5 +1,6 @@
|
||||||
use crate::grpc_stream_utils::channelize_stream;
|
use crate::grpc_stream_utils::channelize_stream;
|
||||||
use crate::grpc_subscription::map_block_update;
|
use crate::grpc_subscription::map_block_update;
|
||||||
|
use futures::StreamExt;
|
||||||
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
|
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
|
||||||
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
|
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
|
||||||
};
|
};
|
||||||
|
@ -7,7 +8,6 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{
|
||||||
create_multiplexed_stream, FromYellowstoneExtractor,
|
create_multiplexed_stream, FromYellowstoneExtractor,
|
||||||
};
|
};
|
||||||
use log::info;
|
use log::info;
|
||||||
use merge_streams::MergeStreams;
|
|
||||||
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
|
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
|
||||||
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
|
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
|
||||||
use solana_lite_rpc_core::AnyhowJoinHandle;
|
use solana_lite_rpc_core::AnyhowJoinHandle;
|
||||||
|
@ -36,6 +36,21 @@ impl FromYellowstoneExtractor for BlockExtractor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct BlockHashExtractor(CommitmentConfig);
|
||||||
|
|
||||||
|
impl FromYellowstoneExtractor for BlockHashExtractor {
|
||||||
|
type Target = String;
|
||||||
|
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(u64, String)> {
|
||||||
|
match update.update_oneof {
|
||||||
|
Some(UpdateOneof::Block(block)) => Some((block.slot, block.blockhash)),
|
||||||
|
Some(UpdateOneof::BlockMeta(block_meta)) => {
|
||||||
|
Some((block_meta.slot, block_meta.blockhash))
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create_grpc_multiplex_blocks_subscription(
|
pub fn create_grpc_multiplex_blocks_subscription(
|
||||||
grpc_sources: Vec<GrpcSourceConfig>,
|
grpc_sources: Vec<GrpcSourceConfig>,
|
||||||
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
|
) -> (Receiver<ProducedBlock>, AnyhowJoinHandle) {
|
||||||
|
@ -68,28 +83,57 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
||||||
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
|
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
|
||||||
};
|
};
|
||||||
|
|
||||||
let multiplex_stream_finalized = {
|
let (sender, multiplexed_merged_blocks) =
|
||||||
|
tokio::sync::broadcast::channel::<ProducedBlock>(1000);
|
||||||
|
|
||||||
|
let meta_stream_finalized = {
|
||||||
let commitment_config = CommitmentConfig::finalized();
|
let commitment_config = CommitmentConfig::finalized();
|
||||||
|
|
||||||
let mut streams = Vec::new();
|
let mut streams = Vec::new();
|
||||||
for grpc_source in &grpc_sources {
|
for grpc_source in &grpc_sources {
|
||||||
let stream = create_geyser_reconnecting_stream(
|
let stream = create_geyser_reconnecting_stream(
|
||||||
grpc_source.clone(),
|
grpc_source.clone(),
|
||||||
GeyserFilter(commitment_config).blocks_and_txs(),
|
GeyserFilter(commitment_config).blocks_meta(),
|
||||||
);
|
);
|
||||||
streams.push(stream);
|
streams.push(stream);
|
||||||
}
|
}
|
||||||
|
create_multiplexed_stream(streams, BlockHashExtractor(commitment_config))
|
||||||
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
|
};
|
||||||
|
let jh_channelizer = {
|
||||||
|
// spawn merged
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
let mut map_of_confimed_blocks = HashMap::<String, ProducedBlock>::new();
|
||||||
|
let mut multiplex_stream_confirmed = std::pin::pin!(multiplex_stream_confirmed);
|
||||||
|
let mut meta_stream_finalized = std::pin::pin!(meta_stream_finalized);
|
||||||
|
let sender = sender;
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
confirmed_block = multiplex_stream_confirmed.next() => {
|
||||||
|
if let Some(confirmed_block) = confirmed_block {
|
||||||
|
if let Err(e) = sender.send(confirmed_block.clone()) {
|
||||||
|
panic!("Confirmed block stream send gave error {e:?}");
|
||||||
|
}
|
||||||
|
map_of_confimed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
|
||||||
|
} else {
|
||||||
|
panic!("Confirmed stream broke");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
meta_finalized = meta_stream_finalized.next() => {
|
||||||
|
if let Some(blockhash) = meta_finalized {
|
||||||
|
if let Some(mut finalized_block) = map_of_confimed_blocks.remove(&blockhash) {
|
||||||
|
finalized_block.commitment_config = CommitmentConfig::finalized();
|
||||||
|
if let Err(e) = sender.send(finalized_block.clone()) {
|
||||||
|
panic!("Finalized block stream send gave error {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
let merged_stream_confirmed_finalize =
|
(multiplexed_merged_blocks, jh_channelizer)
|
||||||
(multiplex_stream_confirmed, multiplex_stream_finalized).merge();
|
|
||||||
|
|
||||||
let (multiplexed_finalized_blocks, jh_channelizer) =
|
|
||||||
channelize_stream(merged_stream_confirmed_finalize);
|
|
||||||
|
|
||||||
(multiplexed_finalized_blocks, jh_channelizer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SlotExtractor {}
|
struct SlotExtractor {}
|
||||||
|
|
Loading…
Reference in New Issue