Send program deploy txs to up to 2 leaders (#15421)

This commit is contained in:
Tyera Eulberg 2021-02-18 20:14:48 -07:00 committed by GitHub
parent 787637e292
commit 4e84869c8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 19 deletions

View File

@ -1,4 +1,4 @@
use crate::send_tpu::{get_leader_tpu, send_transaction_tpu}; use crate::send_tpu::{get_leader_tpus, send_transaction_tpu};
use crate::{ use crate::{
checks::*, checks::*,
cli::{ cli::{
@ -55,6 +55,7 @@ use std::{
}; };
const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE
const NUM_TPU_LEADERS: u64 = 2;
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum ProgramCliCommand { pub enum ProgramCliCommand {
@ -1577,7 +1578,7 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
let cluster_nodes = rpc_client.get_cluster_nodes().ok(); let cluster_nodes = rpc_client.get_cluster_nodes().ok();
loop { loop {
progress_bar.set_message("Finding leader node..."); progress_bar.set_message("Finding leader nodes...");
let epoch_info = rpc_client.get_epoch_info()?; let epoch_info = rpc_client.get_epoch_info()?;
let mut slot = epoch_info.absolute_slot; let mut slot = epoch_info.absolute_slot;
let mut last_epoch_fetch = Instant::now(); let mut last_epoch_fetch = Instant::now();
@ -1586,8 +1587,9 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
leader_schedule_epoch = epoch_info.epoch; leader_schedule_epoch = epoch_info.epoch;
} }
let mut tpu_address = get_leader_tpu( let mut tpu_addresses = get_leader_tpus(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
NUM_TPU_LEADERS,
leader_schedule.as_ref(), leader_schedule.as_ref(),
cluster_nodes.as_ref(), cluster_nodes.as_ref(),
); );
@ -1596,10 +1598,12 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
let mut pending_transactions = HashMap::new(); let mut pending_transactions = HashMap::new();
let num_transactions = transactions.len(); let num_transactions = transactions.len();
for transaction in transactions { for transaction in transactions {
if let Some(tpu_address) = tpu_address { if !tpu_addresses.is_empty() {
let wire_transaction = let wire_transaction =
serialize(&transaction).expect("serialization should succeed"); serialize(&transaction).expect("serialization should succeed");
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); for tpu_address in &tpu_addresses {
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
}
} else { } else {
let _result = rpc_client let _result = rpc_client
.send_transaction_with_config( .send_transaction_with_config(
@ -1625,8 +1629,9 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
if last_epoch_fetch.elapsed() > Duration::from_millis(400) { if last_epoch_fetch.elapsed() > Duration::from_millis(400) {
let epoch_info = rpc_client.get_epoch_info()?; let epoch_info = rpc_client.get_epoch_info()?;
last_epoch_fetch = Instant::now(); last_epoch_fetch = Instant::now();
tpu_address = get_leader_tpu( tpu_addresses = get_leader_tpus(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
NUM_TPU_LEADERS,
leader_schedule.as_ref(), leader_schedule.as_ref(),
cluster_nodes.as_ref(), cluster_nodes.as_ref(),
); );
@ -1677,17 +1682,20 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
} }
let epoch_info = rpc_client.get_epoch_info()?; let epoch_info = rpc_client.get_epoch_info()?;
tpu_address = get_leader_tpu( tpu_addresses = get_leader_tpus(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch), min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
NUM_TPU_LEADERS,
leader_schedule.as_ref(), leader_schedule.as_ref(),
cluster_nodes.as_ref(), cluster_nodes.as_ref(),
); );
for transaction in pending_transactions.values() { for transaction in pending_transactions.values() {
if let Some(tpu_address) = tpu_address { if !tpu_addresses.is_empty() {
let wire_transaction = let wire_transaction =
serialize(transaction).expect("serialization should succeed"); serialize(&transaction).expect("serialization should succeed");
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction); for tpu_address in &tpu_addresses {
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
}
} else { } else {
let _result = rpc_client let _result = rpc_client
.send_transaction_with_config( .send_transaction_with_config(

View File

@ -1,21 +1,38 @@
use log::*; use log::*;
use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule}; use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule};
use solana_sdk::clock::NUM_CONSECUTIVE_LEADER_SLOTS;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
pub fn get_leader_tpu( pub fn get_leader_tpus(
slot_index: u64, slot_index: u64,
num_leaders: u64,
leader_schedule: Option<&RpcLeaderSchedule>, leader_schedule: Option<&RpcLeaderSchedule>,
cluster_nodes: Option<&Vec<RpcContactInfo>>, cluster_nodes: Option<&Vec<RpcContactInfo>>,
) -> Option<SocketAddr> { ) -> Vec<SocketAddr> {
leader_schedule? let leaders: Vec<_> = (0..num_leaders)
.iter() .filter_map(|i| {
.find(|(_pubkey, slots)| slots.iter().any(|slot| *slot as u64 == slot_index)) leader_schedule?
.and_then(|(pubkey, _)| {
cluster_nodes?
.iter() .iter()
.find(|contact_info| contact_info.pubkey == *pubkey) .find(|(_pubkey, slots)| {
.and_then(|contact_info| contact_info.tpu) slots.iter().any(|slot| {
*slot as u64 == (slot_index + (i * NUM_CONSECUTIVE_LEADER_SLOTS))
})
})
.and_then(|(pubkey, _)| {
cluster_nodes?
.iter()
.find(|contact_info| contact_info.pubkey == *pubkey)
.and_then(|contact_info| contact_info.tpu)
})
}) })
.collect();
let mut unique_leaders = vec![];
for leader in leaders.into_iter() {
if !unique_leaders.contains(&leader) {
unique_leaders.push(leader);
}
}
unique_leaders
} }
pub fn send_transaction_tpu( pub fn send_transaction_tpu(