Cli: deploy programs via TPU (#13090)

* Deploy: send write transactions to leader tpu

* Less apparent stalling during confirmation

* Add EpochInfo mock

* Only get cluster nodes once

* Send deploy writes to next leader
This commit is contained in:
Tyera Eulberg 2020-10-23 10:03:29 -06:00 committed by GitHub
parent 7d2729f6bd
commit 16944e218f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 83 additions and 21 deletions

View File

@ -1,7 +1,8 @@
use crate::{
checks::*, cluster_query::*, feature::*, inflation::*, nonce::*, spend_utils::*, stake::*,
validator_info::*, vote::*,
checks::*, cluster_query::*, feature::*, inflation::*, nonce::*, send_tpu::*, spend_utils::*,
stake::*, validator_info::*, vote::*,
};
use bincode::serialize;
use bip39::{Language, Mnemonic, MnemonicType, Seed};
use clap::{value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand};
use log::*;
@ -32,7 +33,7 @@ use solana_client::{
rpc_client::RpcClient,
rpc_config::{RpcLargestAccountsFilter, RpcSendTransactionConfig},
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
rpc_response::RpcKeyedAccount,
rpc_response::{RpcKeyedAccount, RpcLeaderSchedule},
};
#[cfg(not(test))]
use solana_faucet::faucet::request_airdrop_transaction;
@ -42,7 +43,7 @@ use solana_rbpf::vm::EbpfVm;
use solana_remote_wallet::remote_wallet::RemoteWalletManager;
use solana_sdk::{
bpf_loader, bpf_loader_deprecated,
clock::{Epoch, Slot, DEFAULT_TICKS_PER_SECOND},
clock::{Epoch, Slot},
commitment_config::CommitmentConfig,
decode_error::DecodeError,
hash::Hash,
@ -64,12 +65,13 @@ use solana_stake_program::{
use solana_transaction_status::{EncodedTransaction, UiTransactionEncoding};
use solana_vote_program::vote_state::VoteAuthorize;
use std::{
cmp::min,
collections::HashMap,
error,
fmt::Write as FmtWrite,
fs::File,
io::{Read, Write},
net::{IpAddr, SocketAddr},
net::{IpAddr, SocketAddr, UdpSocket},
str::FromStr,
sync::Arc,
thread::sleep,
@ -1031,33 +1033,50 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
) -> Result<(), Box<dyn error::Error>> {
let progress_bar = new_spinner_progress_bar();
let mut send_retries = 5;
let mut leader_schedule: Option<RpcLeaderSchedule> = None;
let mut leader_schedule_epoch = 0;
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let cluster_nodes = rpc_client.get_cluster_nodes().ok();
loop {
let mut status_retries = 15;
progress_bar.set_message("Finding leader node...");
let epoch_info = rpc_client.get_epoch_info_with_commitment(commitment)?;
if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() {
leader_schedule = rpc_client
.get_leader_schedule_with_commitment(Some(epoch_info.absolute_slot), commitment)?;
leader_schedule_epoch = epoch_info.epoch;
}
let tpu_address = get_leader_tpu(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
);
// Send all transactions
let mut pending_transactions = HashMap::new();
let num_transactions = transactions.len();
for transaction in transactions {
if cfg!(not(test)) {
// Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors
// when all the write transactions modify the same program account (eg, deploying a
// new program)
sleep(Duration::from_millis(1000 / DEFAULT_TICKS_PER_SECOND));
if let Some(tpu_address) = tpu_address {
let wire_transaction =
serialize(&transaction).expect("serialization should succeed");
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
} else {
let _result = rpc_client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
preflight_commitment: Some(commitment.commitment),
..RpcSendTransactionConfig::default()
},
)
.ok();
}
let _result = rpc_client
.send_transaction_with_config(
&transaction,
RpcSendTransactionConfig {
preflight_commitment: Some(commitment.commitment),
..RpcSendTransactionConfig::default()
},
)
.ok();
pending_transactions.insert(transaction.signatures[0], transaction);
progress_bar.set_message(&format!(
"[{}/{}] Transactions sent",
"[{}/{}] Total Transactions sent",
pending_transactions.len(),
num_transactions
));
@ -1093,6 +1112,11 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
let _ = pending_transactions.remove(&signature);
}
}
progress_bar.set_message(&format!(
"[{}/{}] Transactions confirmed",
num_transactions - pending_transactions.len(),
num_transactions
));
}
if pending_transactions.is_empty() {

View File

@ -26,6 +26,7 @@ pub mod cluster_query;
pub mod feature;
pub mod inflation;
pub mod nonce;
pub mod send_tpu;
pub mod spend_utils;
pub mod stake;
pub mod test_utils;

29
cli/src/send_tpu.rs Normal file
View File

@ -0,0 +1,29 @@
use log::*;
use solana_client::rpc_response::{RpcContactInfo, RpcLeaderSchedule};
use std::net::{SocketAddr, UdpSocket};
pub fn get_leader_tpu(
slot_index: u64,
leader_schedule: Option<&RpcLeaderSchedule>,
cluster_nodes: Option<&Vec<RpcContactInfo>>,
) -> Option<SocketAddr> {
leader_schedule?
.iter()
.find(|(_pubkey, slots)| slots.iter().any(|slot| *slot as u64 == slot_index))
.and_then(|(pubkey, _)| {
cluster_nodes?
.iter()
.find(|contact_info| contact_info.pubkey == *pubkey)
.and_then(|contact_info| contact_info.tpu)
})
}
pub fn send_transaction_tpu(
send_socket: &UdpSocket,
tpu_address: &SocketAddr,
wire_transaction: &[u8],
) {
if let Err(err) = send_socket.send_to(wire_transaction, tpu_address) {
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
}
}

View File

@ -6,6 +6,7 @@ use crate::{
};
use serde_json::{json, Number, Value};
use solana_sdk::{
epoch_info::EpochInfo,
fee_calculator::{FeeCalculator, FeeRateGovernor},
instruction::InstructionError,
signature::Signature,
@ -58,6 +59,13 @@ impl RpcSender for MockSender {
serde_json::to_value(FeeCalculator::default()).unwrap(),
),
})?,
RpcRequest::GetEpochInfo => serde_json::to_value(EpochInfo {
epoch: 1,
slot_index: 2,
slots_in_epoch: 32,
absolute_slot: 34,
block_height: 34,
})?,
RpcRequest::GetFeeCalculatorForBlockhash => {
let value = if self.url == "blockhash_expired" {
Value::Null