Improve solana catchup (#14313)

* Improve solana catchup

* Overidable port, retry, args error clean up

* print cleanup

* Reduce diff

* Tweak warns a bit
This commit is contained in:
Ryo Onodera 2021-01-05 10:10:27 +09:00 committed by GitHub
parent 0619805806
commit aa4da339ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 38 deletions

View File

@ -75,9 +75,11 @@ pub const DEFAULT_RPC_TIMEOUT_SECONDS: &str = "30";
pub enum CliCommand {
// Cluster Query Commands
Catchup {
node_pubkey: Pubkey,
node_pubkey: Option<Pubkey>,
node_json_rpc_url: Option<String>,
follow: bool,
our_localhost_port: Option<u16>,
log: bool,
},
ClusterDate,
ClusterVersion,
@ -1138,7 +1140,17 @@ pub fn process_command(config: &CliConfig) -> ProcessResult {
node_pubkey,
node_json_rpc_url,
follow,
} => process_catchup(&rpc_client, config, node_pubkey, node_json_rpc_url, *follow),
our_localhost_port,
log,
} => process_catchup(
&rpc_client,
config,
*node_pubkey,
node_json_rpc_url.clone(),
*follow,
*our_localhost_port,
*log,
),
CliCommand::ClusterDate => process_cluster_date(&rpc_client, config),
CliCommand::ClusterVersion => process_cluster_version(&rpc_client, config),
CliCommand::CreateAddressWithSeed {

View File

@ -41,6 +41,7 @@ use solana_sdk::{
message::Message,
native_token::lamports_to_sol,
pubkey::{self, Pubkey},
rpc_port::DEFAULT_RPC_PORT_STR,
signature::Signature,
system_instruction, system_program,
sysvar::{
@ -88,14 +89,14 @@ impl ClusterQuerySubCommands for App<'_, '_> {
.arg(
pubkey!(Arg::with_name("node_pubkey")
.index(1)
.value_name("VALIDATOR_PUBKEY")
.required(true),
.value_name("OUR_VALIDATOR_PUBKEY")
.required(false),
"Identity pubkey of the validator"),
)
.arg(
Arg::with_name("node_json_rpc_url")
.index(2)
.value_name("URL")
.value_name("OUR_URL")
.takes_value(true)
.validator(is_url)
.help("JSON RPC URL for validator, which is useful for validators with a private RPC service")
@ -106,6 +107,21 @@ impl ClusterQuerySubCommands for App<'_, '_> {
.takes_value(false)
.help("Continue reporting progress even after the validator has caught up"),
)
.arg(
Arg::with_name("our_localhost")
.long("our-localhost")
.takes_value(false)
.value_name("PORT")
.default_value(&DEFAULT_RPC_PORT_STR)
.validator(is_port)
.help("Guess Identity pubkey and validator rpc node assuming local (possibly private) validator"),
)
.arg(
Arg::with_name("log")
.long("log")
.takes_value(false)
.help("Don't update the progress inplace; instead show updates with its own new lines"),
)
.arg(commitment_arg()),
)
.subcommand(
@ -379,14 +395,31 @@ pub fn parse_catchup(
matches: &ArgMatches<'_>,
wallet_manager: &mut Option<Arc<RemoteWalletManager>>,
) -> Result<CliCommandInfo, CliError> {
let node_pubkey = pubkey_of_signer(matches, "node_pubkey", wallet_manager)?.unwrap();
let node_pubkey = pubkey_of_signer(matches, "node_pubkey", wallet_manager)?;
let mut our_localhost_port = value_t!(matches, "our_localhost", u16).ok();
// if there is no explicitly specified --our-localhost,
// disable the guess mode (= our_localhost_port)
if matches.occurrences_of("our_localhost") == 0 {
our_localhost_port = None
}
let node_json_rpc_url = value_t!(matches, "node_json_rpc_url", String).ok();
// requirement of node_pubkey is relaxed only if our_localhost_port
if our_localhost_port.is_none() && node_pubkey.is_none() {
return Err(CliError::BadParameter(
"OUR_VALIDATOR_PUBKEY (and possibly OUR_URL) must be specified \
unless --our-localhost is given"
.into(),
));
}
let follow = matches.is_present("follow");
let log = matches.is_present("log");
Ok(CliCommandInfo {
command: CliCommand::Catchup {
node_pubkey,
node_json_rpc_url,
follow,
our_localhost_port,
log,
},
signers: vec![],
})
@ -566,38 +599,76 @@ pub fn parse_transaction_history(
pub fn process_catchup(
rpc_client: &RpcClient,
config: &CliConfig,
node_pubkey: &Pubkey,
node_json_rpc_url: &Option<String>,
node_pubkey: Option<Pubkey>,
mut node_json_rpc_url: Option<String>,
follow: bool,
our_localhost_port: Option<u16>,
log: bool,
) -> ProcessResult {
let sleep_interval = 5;
let progress_bar = new_spinner_progress_bar();
progress_bar.set_message("Connecting...");
let node_client = if let Some(node_json_rpc_url) = node_json_rpc_url {
RpcClient::new(node_json_rpc_url.to_string())
} else {
let rpc_addr = loop {
let cluster_nodes = rpc_client.get_cluster_nodes()?;
if let Some(contact_info) = cluster_nodes
.iter()
.find(|contact_info| contact_info.pubkey == node_pubkey.to_string())
{
if let Some(rpc_addr) = contact_info.rpc {
break rpc_addr;
}
progress_bar.set_message(&format!("RPC service not found for {}", node_pubkey));
} else {
progress_bar.set_message(&format!(
"Contact information not found for {}",
node_pubkey
));
}
sleep(Duration::from_secs(sleep_interval as u64));
};
if let Some(our_localhost_port) = our_localhost_port {
let gussed_default = Some(format!("http://localhost:{}", our_localhost_port));
if node_json_rpc_url.is_some() && node_json_rpc_url != gussed_default {
// go to new line to leave this message on console
println!(
"Prefering explicitly given rpc ({}) as us, \
although --our-localhost is given\n",
node_json_rpc_url.as_ref().unwrap()
);
} else {
node_json_rpc_url = gussed_default;
}
}
RpcClient::new_socket(rpc_addr)
let (node_client, node_pubkey) = if our_localhost_port.is_some() {
let client = RpcClient::new(node_json_rpc_url.unwrap());
let guessed_default = Some(client.get_identity()?);
(
client,
(if node_pubkey.is_some() && node_pubkey != guessed_default {
// go to new line to leave this message on console
println!(
"Prefering explicitly given node pubkey ({}) as us, \
although --our-localhost is given\n",
node_pubkey.unwrap()
);
node_pubkey
} else {
guessed_default
})
.unwrap(),
)
} else if let Some(node_pubkey) = node_pubkey {
if let Some(node_json_rpc_url) = node_json_rpc_url {
(RpcClient::new(node_json_rpc_url), node_pubkey)
} else {
let rpc_addr = loop {
let cluster_nodes = rpc_client.get_cluster_nodes()?;
if let Some(contact_info) = cluster_nodes
.iter()
.find(|contact_info| contact_info.pubkey == node_pubkey.to_string())
{
if let Some(rpc_addr) = contact_info.rpc {
break rpc_addr;
}
progress_bar.set_message(&format!("RPC service not found for {}", node_pubkey));
} else {
progress_bar.set_message(&format!(
"Contact information not found for {}",
node_pubkey
));
}
sleep(Duration::from_secs(sleep_interval as u64));
};
(RpcClient::new_socket(rpc_addr), node_pubkey)
}
} else {
unreachable!()
};
let reported_node_pubkey = loop {
@ -614,7 +685,7 @@ pub fn process_catchup(
}
};
if reported_node_pubkey != *node_pubkey {
if reported_node_pubkey != node_pubkey {
return Err(format!(
"The identity reported by node RPC URL does not match. Expected: {:?}. Reported: {:?}",
node_pubkey, reported_node_pubkey
@ -622,15 +693,41 @@ pub fn process_catchup(
.into());
}
if rpc_client.get_identity()? == *node_pubkey {
if rpc_client.get_identity()? == node_pubkey {
return Err("Both RPC URLs reference the same node, unable to monitor for catchup. Try a different --url".into());
}
let mut previous_rpc_slot = std::u64::MAX;
let mut previous_slot_distance = 0;
let mut retry_count = 0;
let max_retry_count = 5;
let mut get_slot_while_retrying = |client: &RpcClient| {
loop {
match client.get_slot_with_commitment(config.commitment) {
Ok(r) => {
retry_count = 0;
return Ok(r);
}
Err(e) => {
if retry_count >= max_retry_count {
return Err(e);
}
retry_count += 1;
if log {
// go to new line to leave this message on console
println!("Retrying({}/{}): {}\n", retry_count, max_retry_count, e);
}
sleep(Duration::from_secs(1));
}
};
}
};
loop {
let rpc_slot = rpc_client.get_slot_with_commitment(config.commitment)?;
let node_slot = node_client.get_slot_with_commitment(config.commitment)?;
// humbly retry; the reference node (rpc_client) could be spotty,
// especially if pointing to api.meinnet-beta.solana.com at times
let rpc_slot = get_slot_while_retrying(rpc_client)?;
let node_slot = get_slot_while_retrying(&node_client)?;
if !follow && node_slot > std::cmp::min(previous_rpc_slot, rpc_slot) {
progress_bar.finish_and_clear();
return Ok(format!(
@ -653,15 +750,21 @@ pub fn process_catchup(
};
progress_bar.set_message(&format!(
"{} slots behind (us:{} them:{}){}",
slot_distance,
"{} slot(s) {} (us:{} them:{}){}",
slot_distance.abs(),
if slot_distance >= 0 {
"behind"
} else {
"ahead"
},
node_slot,
rpc_slot,
if slot_distance == 0 || previous_rpc_slot == std::u64::MAX {
"".to_string()
} else {
format!(
", {} at {:.1} slots/second{}",
", {} node is {} at {:.1} slots/second{}",
if slot_distance >= 0 { "our" } else { "their" },
if slots_per_second < 0.0 {
"falling behind"
} else {
@ -670,8 +773,11 @@ pub fn process_catchup(
slots_per_second,
time_remaining
)
}
},
));
if log {
println!();
}
sleep(Duration::from_secs(sleep_interval as u64));
previous_rpc_slot = rpc_slot;

View File

@ -1,5 +1,6 @@
/// Default port number for JSON RPC API
pub const DEFAULT_RPC_PORT: u16 = 8899;
pub const DEFAULT_RPC_PORT_STR: &str = "8899";
/// Default port number for JSON RPC pubsub
pub const DEFAULT_RPC_PUBSUB_PORT: u16 = 8900;