Solving merge issues
This commit is contained in:
parent
9369c0f03b
commit
ab9511ccb1
|
@ -104,31 +104,26 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
tasks
|
||||
};
|
||||
|
||||
let finalized_blocks_stream = {
|
||||
let finalized_blockmeta_stream = {
|
||||
let commitment_config = CommitmentConfig::finalized();
|
||||
|
||||
let mut streams = Vec::new();
|
||||
for grpc_source in &grpc_sources {
|
||||
let stream = create_geyser_reconnecting_stream(
|
||||
grpc_source.clone(),
|
||||
GeyserFilter(commitment_config).blocks_and_txs(),
|
||||
GeyserFilter(commitment_config).blocks_meta(),
|
||||
);
|
||||
streams.push(stream);
|
||||
}
|
||||
|
||||
create_multiplexed_stream(streams, BlockExtractor(commitment_config))
|
||||
create_multiplexed_stream(streams, BlockMetaHashExtractor(commitment_config))
|
||||
};
|
||||
|
||||
// by blockhash
|
||||
<<<<<<< HEAD
|
||||
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
|
||||
let mut finalized_blocks_stream = std::pin::pin!(finalized_blocks_stream);
|
||||
=======
|
||||
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
|
||||
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);
|
||||
>>>>>>> ssh/main
|
||||
|
||||
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
|
||||
let mut last_finalized_slot: Slot = 0;
|
||||
let mut cleanup_without_recv_blocks: u8 = 0;
|
||||
let mut cleanup_without_recv_blocks_meta: u8 = 0;
|
||||
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
|
||||
|
@ -144,8 +139,9 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
warn!("Confirmed block channel has no receivers {e:?}");
|
||||
continue
|
||||
}
|
||||
recent_confirmed_blocks.insert(confirmed_block.blockhash.clone(), confirmed_block);
|
||||
},
|
||||
finalized_block = finalized_blocks_stream.next() => {
|
||||
meta_finalized = finalized_blockmeta_stream.next() => {
|
||||
cleanup_without_recv_blocks_meta = 0;
|
||||
let blockhash = meta_finalized.expect("finalized block meta from stream");
|
||||
if let Some(cached_confirmed_block) = recent_confirmed_blocks.remove(&blockhash) {
|
||||
|
@ -170,6 +166,14 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
}
|
||||
cleanup_without_recv_blocks += 1;
|
||||
cleanup_without_recv_blocks_meta += 1;
|
||||
let size_before = recent_confirmed_blocks.len();
|
||||
recent_confirmed_blocks.retain(|_blockhash, block| {
|
||||
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
|
||||
});
|
||||
let cnt_cleaned = size_before - recent_confirmed_blocks.len();
|
||||
if cnt_cleaned > 0 {
|
||||
debug!("cleaned {} confirmed blocks from cache", cnt_cleaned);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue