parent
38da1ed987
commit
67641f6d40
|
@ -162,8 +162,12 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
);
|
||||
}
|
||||
|
||||
let _processed_blocks_tasks =
|
||||
// tasks which should be cleaned up uppon reconnect
|
||||
let mut task_list: Vec<AbortHandle> = vec![];
|
||||
|
||||
let processed_blocks_tasks =
|
||||
create_grpc_multiplex_processed_block_stream(&grpc_sources, processed_block_sender);
|
||||
task_list.extend(processed_blocks_tasks);
|
||||
|
||||
let confirmed_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
|
||||
&grpc_sources,
|
||||
|
@ -175,6 +179,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
);
|
||||
|
||||
// by blockhash
|
||||
// this map consumes sigificant amount of memory constrainted by CLEANUP_SLOTS_BEHIND_FINALIZED
|
||||
let mut recent_processed_blocks = HashMap::<String, ProducedBlock>::new();
|
||||
// both streams support backpressure, see log:
|
||||
// grpc_subscription_autoreconnect_tasks: downstream receiver did not pick put message for 500ms - keep waiting
|
||||
|
@ -183,11 +188,11 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
|
||||
let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
|
||||
let mut last_finalized_slot: Slot = 0;
|
||||
const CLEANUP_SLOTS_BEHIND_FINALIZED: u64 = 100;
|
||||
let mut cleanup_without_recv_full_blocks: u8 = 0;
|
||||
let mut cleanup_without_confirmed_recv_blocks_meta: u8 = 0;
|
||||
let mut cleanup_without_finalized_recv_blocks_meta: u8 = 0;
|
||||
let mut confirmed_block_not_yet_processed = HashSet::<String>::new();
|
||||
let _finalized_block_not_yet_processed = HashSet::<String>::new();
|
||||
|
||||
// start logging errors when we recieve first finalized block
|
||||
let mut startup_completed = false;
|
||||
|
@ -195,6 +200,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
'recv_loop: loop {
|
||||
tokio::select! {
|
||||
processed_block = processed_block_reciever.recv() => {
|
||||
cleanup_without_recv_full_blocks = 0;
|
||||
|
||||
let processed_block = processed_block.expect("processed block from stream");
|
||||
trace!("got processed block {} with blockhash {}",
|
||||
processed_block.slot, processed_block.blockhash.clone());
|
||||
|
@ -243,6 +250,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
}
|
||||
},
|
||||
_ = cleanup_tick.tick() => {
|
||||
// timebased restart
|
||||
if cleanup_without_recv_full_blocks > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
|
||||
cleanup_without_confirmed_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV ||
|
||||
cleanup_without_finalized_recv_blocks_meta > MAX_ALLOWED_CLEANUP_WITHOUT_RECV {
|
||||
|
@ -257,15 +265,16 @@ pub fn create_grpc_multiplex_blocks_subscription(
|
|||
cleanup_without_finalized_recv_blocks_meta += 1;
|
||||
let size_before = recent_processed_blocks.len();
|
||||
recent_processed_blocks.retain(|_blockhash, block| {
|
||||
last_finalized_slot == 0 || block.slot > last_finalized_slot - 100
|
||||
last_finalized_slot == 0 || block.slot > last_finalized_slot.saturating_sub(CLEANUP_SLOTS_BEHIND_FINALIZED)
|
||||
});
|
||||
let cnt_cleaned = size_before - recent_processed_blocks.len();
|
||||
let cnt_cleaned = size_before.saturating_sub(recent_processed_blocks.len());
|
||||
if cnt_cleaned > 0 {
|
||||
debug!("cleaned {} processed blocks from cache", cnt_cleaned);
|
||||
}
|
||||
}
|
||||
}
|
||||
} // -- END receiver loop
|
||||
task_list.iter().for_each(|task| task.abort());
|
||||
} // -- END reconnect loop
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue