Improve comments, error handling, and retry delay handling

This commit is contained in:
Michael Vines 2021-09-07 18:08:20 -05:00
parent d3aa9bce26
commit 007fb3abe5
1 changed files with 44 additions and 31 deletions

View File

@ -41,7 +41,7 @@ pub const MAX_FANOUT_SLOTS: u64 = 100;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TpuClientConfig { pub struct TpuClientConfig {
/// The range of upcoming slots to include when determining which /// The range of upcoming slots to include when determining which
/// leaders to send transactions to (min: 1, max: 100) /// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
pub fanout_slots: u64, pub fanout_slots: u64,
} }
@ -63,13 +63,14 @@ pub struct TpuClient {
} }
impl TpuClient { impl TpuClient {
/// Serializes and sends a transaction to the current leader's TPU port /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool { pub fn send_transaction(&self, transaction: &Transaction) -> bool {
let wire_transaction = serialize(transaction).expect("serialization should succeed"); let wire_transaction = serialize(transaction).expect("serialization should succeed");
self.send_wire_transaction(&wire_transaction) self.send_wire_transaction(&wire_transaction)
} }
/// Sends a transaction to the current leader's TPU port /// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool { pub fn send_wire_transaction(&self, wire_transaction: &[u8]) -> bool {
let mut sent = false; let mut sent = false;
for tpu_address in self for tpu_address in self
@ -119,14 +120,14 @@ struct LeaderTpuCache {
} }
impl LeaderTpuCache { impl LeaderTpuCache {
fn new(rpc_client: &RpcClient, first_slot: Slot) -> Self { fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result<Self> {
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot).unwrap_or_default(); let leaders = Self::fetch_slot_leaders(rpc_client, first_slot)?;
let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client).unwrap_or_default(); let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?;
Self { Ok(Self {
first_slot, first_slot,
leaders, leaders,
leader_tpu_map, leader_tpu_map,
} })
} }
// Last slot that has a cached leader pubkey // Last slot that has a cached leader pubkey
@ -144,7 +145,13 @@ impl LeaderTpuCache {
if leader_set.insert(*leader) { if leader_set.insert(*leader) {
leader_sockets.push(*tpu_socket); leader_sockets.push(*tpu_socket);
} }
} else {
// The leader is probably delinquent
trace!("TPU not available for leader {}", leader);
} }
} else {
// Overran the local leader schedule cache
warn!("Leader not known for slot {}", leader_slot);
} }
} }
leader_sockets leader_sockets
@ -245,7 +252,7 @@ impl LeaderTpuService {
let start_slot = rpc_client.get_max_shred_insert_slot()?; let start_slot = rpc_client.get_max_shred_insert_slot()?;
let recent_slots = RecentLeaderSlots::new(start_slot); let recent_slots = RecentLeaderSlots::new(start_slot);
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot))); let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?));
let subscription = if !websocket_url.is_empty() { let subscription = if !websocket_url.is_empty() {
let recent_slots = recent_slots.clone(); let recent_slots = recent_slots.clone();
@ -317,42 +324,48 @@ impl LeaderTpuService {
break; break;
} }
// Sleep a few slots before checking if leader cache needs to be refreshed again
std::thread::sleep(Duration::from_millis(sleep_ms));
sleep_ms = 1000;
// Refresh cluster TPU ports every 5min in case validators restart with new port configuration // Refresh cluster TPU ports every 5min in case validators restart with new port configuration
// or new validators come online // or new validators come online
if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) { if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
if let Ok(leader_tpu_map) = LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) { match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
Ok(leader_tpu_map) => {
leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map; leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
last_cluster_refresh = Instant::now(); last_cluster_refresh = Instant::now();
} else { }
Err(err) => {
warn!("Failed to fetch cluster tpu sockets: {}", err);
sleep_ms = 100; sleep_ms = 100;
continue; }
} }
} }
// Sleep a few slots before checking if leader cache needs to be refreshed again let estimated_current_slot = recent_slots.estimated_current_slot();
std::thread::sleep(Duration::from_millis(sleep_ms)); if estimated_current_slot
let current_slot = recent_slots.estimated_current_slot();
if current_slot
>= leader_tpu_cache >= leader_tpu_cache
.read() .read()
.unwrap() .unwrap()
.last_slot() .last_slot()
.saturating_sub(MAX_FANOUT_SLOTS) .saturating_sub(MAX_FANOUT_SLOTS)
{ {
if let Ok(slot_leaders) = match LeaderTpuCache::fetch_slot_leaders(&rpc_client, estimated_current_slot) {
LeaderTpuCache::fetch_slot_leaders(&rpc_client, current_slot) Ok(slot_leaders) => {
{
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap(); let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.first_slot = current_slot; leader_tpu_cache.first_slot = estimated_current_slot;
leader_tpu_cache.leaders = slot_leaders; leader_tpu_cache.leaders = slot_leaders;
} else { }
Err(err) => {
warn!(
"Failed to fetch slot leaders (current estimated slot: {}): {}",
estimated_current_slot, err
);
sleep_ms = 100; sleep_ms = 100;
continue;
} }
} }
}
sleep_ms = 1000;
} }
} }
} }