From c8f76b8bd05c9557123dd5695b3b809e01fe9ccd Mon Sep 17 00:00:00 2001 From: sakridge Date: Fri, 17 Sep 2021 02:01:56 +0300 Subject: [PATCH] Fix tpu service fanout (#19944) --- client/src/tpu_client.rs | 46 ++++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 313269586f..2497514b41 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -119,16 +119,21 @@ struct LeaderTpuCache { first_slot: Slot, leaders: Vec, leader_tpu_map: HashMap, + slots_in_epoch: Slot, + last_epoch_info_slot: Slot, } impl LeaderTpuCache { fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result { - let leaders = Self::fetch_slot_leaders(rpc_client, first_slot)?; + let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch; + let leaders = Self::fetch_slot_leaders(rpc_client, first_slot, slots_in_epoch)?; let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?; Ok(Self { first_slot, leaders, leader_tpu_map, + slots_in_epoch, + last_epoch_info_slot: first_slot, }) } @@ -186,8 +191,13 @@ impl LeaderTpuCache { .collect()) } - fn fetch_slot_leaders(rpc_client: &RpcClient, start_slot: Slot) -> Result> { - Ok(rpc_client.get_slot_leaders(start_slot, 2 * MAX_FANOUT_SLOTS)?) + fn fetch_slot_leaders( + rpc_client: &RpcClient, + start_slot: Slot, + slots_in_epoch: Slot, + ) -> Result> { + let fanout = (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch); + Ok(rpc_client.get_slot_leaders(start_slot, fanout)?) } } @@ -350,14 +360,28 @@ impl LeaderTpuService { } let estimated_current_slot = recent_slots.estimated_current_slot(); - if estimated_current_slot - >= leader_tpu_cache - .read() - .unwrap() - .last_slot() - .saturating_sub(MAX_FANOUT_SLOTS) - { - match LeaderTpuCache::fetch_slot_leaders(&rpc_client, estimated_current_slot) { + let (last_slot, last_epoch_info_slot, mut slots_in_epoch) = { + let leader_tpu_cache = leader_tpu_cache.read().unwrap(); + ( + leader_tpu_cache.last_slot(), + leader_tpu_cache.last_epoch_info_slot, + leader_tpu_cache.slots_in_epoch, + ) + }; + if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) { + if let Ok(epoch_info) = rpc_client.get_epoch_info() { + slots_in_epoch = epoch_info.slots_in_epoch; + let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); + leader_tpu_cache.slots_in_epoch = slots_in_epoch; + leader_tpu_cache.last_epoch_info_slot = estimated_current_slot; + } + } + if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) { + match LeaderTpuCache::fetch_slot_leaders( + &rpc_client, + estimated_current_slot, + slots_in_epoch, + ) { Ok(slot_leaders) => { let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); leader_tpu_cache.first_slot = estimated_current_slot;