cli: Improve reliability of program deploys (#14902)

* cli: Improve reliability of program deploys

* chore: fix clippy
This commit is contained in:
Justin Starry 2021-01-29 15:15:22 +08:00 committed by GitHub
parent 01230a0105
commit 996a27d475
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 79 additions and 37 deletions

View File

@ -40,8 +40,16 @@ use solana_sdk::{
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::{
cmp::min, collections::HashMap, error, fs::File, io::Read, net::UdpSocket, path::PathBuf,
sync::Arc, thread::sleep, time::Duration,
cmp::min,
collections::HashMap,
error,
fs::File,
io::Read,
net::UdpSocket,
path::PathBuf,
sync::Arc,
thread::sleep,
time::{Duration, Instant},
};
const DATA_CHUNK_SIZE: usize = 229; // Keep program chunks under PACKET_DATA_SIZE
@ -1493,15 +1501,16 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
let cluster_nodes = rpc_client.get_cluster_nodes().ok();
loop {
let mut status_retries = 15;
progress_bar.set_message("Finding leader node...");
let epoch_info = rpc_client.get_epoch_info()?;
let mut slot = epoch_info.absolute_slot;
let mut last_epoch_fetch = Instant::now();
if epoch_info.epoch > leader_schedule_epoch || leader_schedule.is_none() {
leader_schedule = rpc_client.get_leader_schedule(Some(epoch_info.absolute_slot))?;
leader_schedule_epoch = epoch_info.epoch;
}
let tpu_address = get_leader_tpu(
let mut tpu_address = get_leader_tpu(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
@ -1527,52 +1536,61 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
.ok();
}
pending_transactions.insert(transaction.signatures[0], transaction);
progress_bar.set_message(&format!(
"[{}/{}] Total Transactions sent",
"[{}/{}] Transactions sent",
pending_transactions.len(),
num_transactions
));
// Throttle transactions to about 100 TPS
sleep(Duration::from_millis(10));
// Update leader periodically
if last_epoch_fetch.elapsed() > Duration::from_millis(400) {
let epoch_info = rpc_client.get_epoch_info()?;
last_epoch_fetch = Instant::now();
tpu_address = get_leader_tpu(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
);
}
}
// Collect statuses for all the transactions, drop those that are confirmed
while status_retries > 0 {
status_retries -= 1;
progress_bar.set_message(&format!(
"[{}/{}] Transactions confirmed",
num_transactions - pending_transactions.len(),
num_transactions
));
let mut statuses = vec![];
loop {
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
for pending_signatures_chunk in
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS - 1)
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
{
statuses.extend(
rpc_client
.get_signature_statuses_with_history(pending_signatures_chunk)?
.value
.into_iter(),
);
}
assert_eq!(statuses.len(), pending_signatures.len());
for (signature, status) in pending_signatures.into_iter().zip(statuses.into_iter()) {
if let Some(status) = status {
if let Some(confirmation_status) = &status.confirmation_status {
if *confirmation_status != TransactionConfirmationStatus::Processed {
let _ = pending_transactions.remove(&signature);
if let Ok(result) =
rpc_client.get_signature_statuses_with_history(pending_signatures_chunk)
{
let statuses = result.value;
for (signature, status) in
pending_signatures_chunk.iter().zip(statuses.into_iter())
{
if let Some(status) = status {
if let Some(confirmation_status) = &status.confirmation_status {
if *confirmation_status != TransactionConfirmationStatus::Processed
{
let _ = pending_transactions.remove(signature);
}
} else if status.confirmations.is_none()
|| status.confirmations.unwrap() > 1
{
let _ = pending_transactions.remove(signature);
}
}
} else if status.confirmations.is_none() || status.confirmations.unwrap() > 1 {
let _ = pending_transactions.remove(&signature);
}
}
slot = rpc_client.get_slot()?;
progress_bar.set_message(&format!(
"[{}/{}] Transactions confirmed",
"[{}/{}] Transactions confirmed. Retrying in {} slots",
num_transactions - pending_transactions.len(),
num_transactions
num_transactions,
last_valid_slot.saturating_sub(slot)
));
}
@ -1580,11 +1598,35 @@ fn send_and_confirm_transactions_with_spinner<T: Signers>(
return Ok(());
}
let slot = rpc_client.get_slot()?;
if slot > last_valid_slot {
break;
}
let epoch_info = rpc_client.get_epoch_info()?;
tpu_address = get_leader_tpu(
min(epoch_info.slot_index + 1, epoch_info.slots_in_epoch),
leader_schedule.as_ref(),
cluster_nodes.as_ref(),
);
for transaction in pending_transactions.values() {
if let Some(tpu_address) = tpu_address {
let wire_transaction =
serialize(transaction).expect("serialization should succeed");
send_transaction_tpu(&send_socket, &tpu_address, &wire_transaction);
} else {
let _result = rpc_client
.send_transaction_with_config(
transaction,
RpcSendTransactionConfig {
preflight_commitment: Some(commitment.commitment),
..RpcSendTransactionConfig::default()
},
)
.ok();
}
}
if cfg!(not(test)) {
// Retry twice a second
sleep(Duration::from_millis(500));