Fix tpu service fanout (#19944)

This commit is contained in:
sakridge 2021-09-17 02:01:56 +03:00 committed by GitHub
parent 58f25a8752
commit c8f76b8bd0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 35 additions and 11 deletions

View File

@ -119,16 +119,21 @@ struct LeaderTpuCache {
first_slot: Slot,
leaders: Vec<Pubkey>,
leader_tpu_map: HashMap<Pubkey, SocketAddr>,
slots_in_epoch: Slot,
last_epoch_info_slot: Slot,
}
impl LeaderTpuCache {
fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result<Self> {
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot)?;
let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot, slots_in_epoch)?;
let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?;
Ok(Self {
first_slot,
leaders,
leader_tpu_map,
slots_in_epoch,
last_epoch_info_slot: first_slot,
})
}
@ -186,8 +191,13 @@ impl LeaderTpuCache {
.collect())
}
fn fetch_slot_leaders(rpc_client: &RpcClient, start_slot: Slot) -> Result<Vec<Pubkey>> {
Ok(rpc_client.get_slot_leaders(start_slot, 2 * MAX_FANOUT_SLOTS)?)
fn fetch_slot_leaders(
rpc_client: &RpcClient,
start_slot: Slot,
slots_in_epoch: Slot,
) -> Result<Vec<Pubkey>> {
let fanout = (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch);
Ok(rpc_client.get_slot_leaders(start_slot, fanout)?)
}
}
@ -350,14 +360,28 @@ impl LeaderTpuService {
}
let estimated_current_slot = recent_slots.estimated_current_slot();
if estimated_current_slot
>= leader_tpu_cache
.read()
.unwrap()
.last_slot()
.saturating_sub(MAX_FANOUT_SLOTS)
{
match LeaderTpuCache::fetch_slot_leaders(&rpc_client, estimated_current_slot) {
let (last_slot, last_epoch_info_slot, mut slots_in_epoch) = {
let leader_tpu_cache = leader_tpu_cache.read().unwrap();
(
leader_tpu_cache.last_slot(),
leader_tpu_cache.last_epoch_info_slot,
leader_tpu_cache.slots_in_epoch,
)
};
if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
if let Ok(epoch_info) = rpc_client.get_epoch_info() {
slots_in_epoch = epoch_info.slots_in_epoch;
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.slots_in_epoch = slots_in_epoch;
leader_tpu_cache.last_epoch_info_slot = estimated_current_slot;
}
}
if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) {
match LeaderTpuCache::fetch_slot_leaders(
&rpc_client,
estimated_current_slot,
slots_in_epoch,
) {
Ok(slot_leaders) => {
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.first_slot = estimated_current_slot;