optimize mapping of produced block (#382)
optimize mapping of produced block
This commit is contained in:
parent
06373c0844
commit
69d7dbb123
|
@ -49,7 +49,7 @@ fn create_grpc_multiplex_processed_block_task(
|
|||
let jh_merging_streams = tokio::task::spawn(async move {
|
||||
let mut slots_processed = BTreeSet::<u64>::new();
|
||||
let mut last_tick = Instant::now();
|
||||
loop {
|
||||
'recv_loop: loop {
|
||||
// recv loop
|
||||
if last_tick.elapsed() > Duration::from_millis(800) {
|
||||
warn!(
|
||||
|
@ -70,16 +70,23 @@ fn create_grpc_multiplex_processed_block_task(
|
|||
};
|
||||
match blocks_rx_result {
|
||||
Some(Message::GeyserSubscribeUpdate(subscribe_update)) => {
|
||||
let mapfilter =
|
||||
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
|
||||
if let Some((slot, produced_block)) = mapfilter {
|
||||
assert_eq!(COMMITMENT_CONFIG, produced_block.commitment_config);
|
||||
// note: avoid mapping of full block as long as possible
|
||||
let extracted_slot = extract_slot_from_yellowstone_update(&subscribe_update);
|
||||
if let Some(slot) = extracted_slot {
|
||||
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
|
||||
// it means that the slot is too old to process
|
||||
if !slots_processed.contains(&slot)
|
||||
&& (slots_processed.len() < MAX_SIZE / 2
|
||||
|| slot > slots_processed.first().cloned().unwrap_or_default())
|
||||
if slots_processed.contains(&slot) {
|
||||
continue 'recv_loop;
|
||||
}
|
||||
if slots_processed.len() >= MAX_SIZE / 2
|
||||
&& slot <= slots_processed.first().cloned().unwrap_or_default()
|
||||
{
|
||||
continue 'recv_loop;
|
||||
}
|
||||
|
||||
let mapfilter =
|
||||
map_block_from_yellowstone_update(*subscribe_update, COMMITMENT_CONFIG);
|
||||
if let Some((_slot, produced_block)) = mapfilter {
|
||||
let send_started_at = Instant::now();
|
||||
let send_result = block_sender
|
||||
.send(produced_block)
|
||||
|
@ -552,6 +559,16 @@ pub fn create_grpc_multiplex_processed_slots_subscription(
|
|||
(multiplexed_messages_rx, jh_multiplex_task)
|
||||
}
|
||||
|
||||
fn extract_slot_from_yellowstone_update(update: &SubscribeUpdate) -> Option<Slot> {
|
||||
match &update.update_oneof {
|
||||
// list is not exhaustive
|
||||
Some(UpdateOneof::Slot(update_message)) => Some(update_message.slot),
|
||||
Some(UpdateOneof::BlockMeta(update_message)) => Some(update_message.slot),
|
||||
Some(UpdateOneof::Block(update_message)) => Some(update_message.slot),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_slot_from_yellowstone_update(update: SubscribeUpdate) -> Option<Slot> {
|
||||
match update.update_oneof {
|
||||
Some(UpdateOneof::Slot(update_slot_message)) => Some(update_slot_message.slot),
|
||||
|
|
Loading…
Reference in New Issue