diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index d095ee73..996854cd 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -104,31 +104,26 @@ pub fn create_grpc_multiplex_blocks_subscription( tasks }; - let finalized_blocks_stream = { + let finalized_blockmeta_stream = { let commitment_config = CommitmentConfig::finalized(); let mut streams = Vec::new(); for grpc_source in &grpc_sources { let stream = create_geyser_reconnecting_stream( grpc_source.clone(), - GeyserFilter(commitment_config).blocks_and_txs(), + GeyserFilter(commitment_config).blocks_meta(), ); streams.push(stream); } - - create_multiplexed_stream(streams, BlockExtractor(commitment_config)) + create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config)) }; // by blockhash -<<<<<<< HEAD - let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream); - let mut finalized_blocks_stream = std::pin::pin!(finalized_blocks_stream); -======= let mut recent_confirmed_blocks = HashMap::::new(); let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream); ->>>>>>> ssh/main let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5)); + let mut last_finalized_slot: Slot = 0; let mut cleanup_without_recv_blocks: u8 = 0; let mut cleanup_without_recv_blocks_meta: u8 = 0; const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data @@ -144,8 +139,9 @@ pub fn create_grpc_multiplex_blocks_subscription( warn!("Confirmed block channel has no receivers {e:?}"); continue } + recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block); }, - finalized_block = finalized_blocks_stream.next() => { + meta_finalized = finalized_blockmeta_stream.next() => { cleanup_without_recv_blocks_meta = 0; let blockhash = meta_finalized.expect("finalized block meta from stream"); if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) { @@ -170,6 +166,14 @@ pub fn create_grpc_multiplex_blocks_subscription( } cleanup_without_recv_blocks += 1; cleanup_without_recv_blocks_meta += 1; + let size_before = recent_confirmed_blocks.len(); + recent_confirmed_blocks.retain(|_blockhash, block| { + last_finalized_slot == 0 || block.slot > last_finalized_slot - 100 + }); + let cnt_cleaned = size_before - recent_confirmed_blocks.len(); + if cnt_cleaned > 0 { + debug!("cleaned {} confirmed blocks from cache", cnt_cleaned); + } } } } @@ -270,4 +274,4 @@ pub fn create_grpc_multiplex_slots_subscription( }); (multiplexed_messages, jh) -} +} \ No newline at end of file