Add --expected-shred-version option

This commit is contained in:
Michael Vines 2020-01-28 16:56:55 -07:00
parent 1bc9a9c23b
commit bea9cd9684
3 changed files with 72 additions and 34 deletions

View File

@ -272,7 +272,7 @@ impl ClusterInfo {
let ip_addr = node.gossip.ip(); let ip_addr = node.gossip.ip();
format!( format!(
"{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| v{}\n", "{:15} {:2}| {:5} | {:44} | {:5}| {:5}| {:5} | {:5}| {:5} | {:5}| {:5} | {:5}| {:5}| {}\n",
if ContactInfo::is_valid_address(&node.gossip) { if ContactInfo::is_valid_address(&node.gossip) {
ip_addr.to_string() ip_addr.to_string()
} else { } else {

View File

@ -56,6 +56,7 @@ pub struct ValidatorConfig {
pub dev_sigverify_disabled: bool, pub dev_sigverify_disabled: bool,
pub dev_halt_at_slot: Option<Slot>, pub dev_halt_at_slot: Option<Slot>,
pub expected_genesis_hash: Option<Hash>, pub expected_genesis_hash: Option<Hash>,
pub expected_shred_version: Option<u16>,
pub voting_disabled: bool, pub voting_disabled: bool,
pub transaction_status_service_disabled: bool, pub transaction_status_service_disabled: bool,
pub blockstream_unix_socket: Option<PathBuf>, pub blockstream_unix_socket: Option<PathBuf>,
@ -77,6 +78,7 @@ impl Default for ValidatorConfig {
dev_sigverify_disabled: false, dev_sigverify_disabled: false,
dev_halt_at_slot: None, dev_halt_at_slot: None,
expected_genesis_hash: None, expected_genesis_hash: None,
expected_shred_version: None,
voting_disabled: false, voting_disabled: false,
transaction_status_service_disabled: false, transaction_status_service_disabled: false,
blockstream_unix_socket: None, blockstream_unix_socket: None,
@ -194,6 +196,16 @@ impl Validator {
compute_shred_version(&genesis_hash, &bank.hard_forks().read().unwrap()); compute_shred_version(&genesis_hash, &bank.hard_forks().read().unwrap());
Self::print_node_info(&node); Self::print_node_info(&node);
if let Some(expected_shred_version) = config.expected_shred_version {
if expected_shred_version != node.info.shred_version {
error!(
"shred version mismatch: expected {}",
expected_shred_version
);
process::exit(1);
}
}
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(
node.info.clone(), node.info.clone(),
keypair.clone(), keypair.clone(),

View File

@ -200,6 +200,7 @@ fn get_rpc_addr(
node: &Node, node: &Node,
identity_keypair: &Arc<Keypair>, identity_keypair: &Arc<Keypair>,
entrypoint_gossip: &SocketAddr, entrypoint_gossip: &SocketAddr,
expected_shred_version: Option<u16>,
) -> (RpcClient, SocketAddr) { ) -> (RpcClient, SocketAddr) {
let mut cluster_info = ClusterInfo::new( let mut cluster_info = ClusterInfo::new(
ClusterInfo::spy_contact_info(&identity_keypair.pubkey()), ClusterInfo::spy_contact_info(&identity_keypair.pubkey()),
@ -208,61 +209,74 @@ fn get_rpc_addr(
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip)); cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip));
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
let exit = Arc::new(AtomicBool::new(false)); let gossip_exit_flag = Arc::new(AtomicBool::new(false));
let gossip_service = GossipService::new( let gossip_service = GossipService::new(
&cluster_info.clone(), &cluster_info.clone(),
None, None,
None, None,
node.sockets.gossip.try_clone().unwrap(), node.sockets.gossip.try_clone().unwrap(),
&exit, &gossip_exit_flag,
); );
let (rpc_client, rpc_addr) = loop { let (rpc_client, rpc_addr) = loop {
info!( info!(
"Searching for RPC service...\n{}", "Searching for RPC service, shred version={:?}...\n{}",
expected_shred_version,
cluster_info.read().unwrap().contact_info_trace() cluster_info.read().unwrap().contact_info_trace()
); );
let (gossip_peers, rpc_peers) = { let mut rpc_peers = cluster_info.read().unwrap().rpc_peers();
let cluster_info = cluster_info.read().unwrap();
(cluster_info.gossip_peers(), cluster_info.rpc_peers())
};
let found_entrypoint = gossip_peers let shred_version_required = !rpc_peers
.iter() .iter()
.any(|contact_info| contact_info.gossip == *entrypoint_gossip); .all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version);
if found_entrypoint & !rpc_peers.is_empty() { if let Some(expected_shred_version) = expected_shred_version {
let (id, rpc_addr) = { // Filter out rpc peers that don't match the expected shred version
// Prefer the entrypoint's RPC service if present, otherwise pick a node at random rpc_peers = rpc_peers
if let Some(contact_info) = rpc_peers .into_iter()
.iter() .filter(|contact_info| contact_info.shred_version == expected_shred_version)
.find(|contact_info| contact_info.gossip == *entrypoint_gossip) .collect::<Vec<_>>();
{ }
(contact_info.id, contact_info.rpc)
} else { if !rpc_peers.is_empty() {
let i = thread_rng().gen_range(0, rpc_peers.len()); // Prefer the entrypoint's RPC service if present, otherwise pick a node at random
(rpc_peers[i].id, rpc_peers[i].rpc) let contact_info = if let Some(contact_info) = rpc_peers
} .iter()
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
{
Some(contact_info.clone())
} else if shred_version_required {
// Require the user supply a shred version if there are conflicting shred version in
// gossip to reduce the chance of human error
warn!("Multiple shred versions detected, unable to select an RPC service. Restart with --expected-shred-version");
None
} else {
// Pick a node at random
Some(rpc_peers[thread_rng().gen_range(0, rpc_peers.len())].clone())
}; };
info!("Contacting RPC port of node {}: {:?}", id, rpc_addr); if let Some(ContactInfo { id, rpc, .. }) = contact_info {
let rpc_client = RpcClient::new_socket(rpc_addr); info!("Contacting RPC port of node {}: {:?}", id, rpc);
match rpc_client.get_version() { let rpc_client = RpcClient::new_socket(rpc);
Ok(rpc_version) => { match rpc_client.get_version() {
info!("RPC node version: {}", rpc_version.solana_core); Ok(rpc_version) => {
break (rpc_client, rpc_addr); info!("RPC node version: {}", rpc_version.solana_core);
} break (rpc_client, rpc);
Err(err) => { }
warn!("Failed to get RPC version: {}", err); Err(err) => {
warn!("Failed to get RPC version: {}", err);
}
} }
} }
} else {
info!("No RPC service found");
} }
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
}; };
exit.store(true, Ordering::Relaxed); gossip_exit_flag.store(true, Ordering::Relaxed);
gossip_service.join().unwrap(); gossip_service.join().unwrap();
(rpc_client, rpc_addr) (rpc_client, rpc_addr)
@ -575,6 +589,13 @@ pub fn main() {
.validator(hash_validator) .validator(hash_validator)
.help("Require the genesis have this hash"), .help("Require the genesis have this hash"),
) )
.arg(
Arg::with_name("expected_shred_version")
.long("expected-shred-version")
.value_name("VERSION")
.takes_value(true)
.help("Require the shred version be this value"),
)
.arg( .arg(
Arg::with_name("logfile") Arg::with_name("logfile")
.short("o") .short("o")
@ -654,6 +675,7 @@ pub fn main() {
expected_genesis_hash: matches expected_genesis_hash: matches
.value_of("expected_genesis_hash") .value_of("expected_genesis_hash")
.map(|s| Hash::from_str(&s).unwrap()), .map(|s| Hash::from_str(&s).unwrap()),
expected_shred_version: value_t!(matches, "expected_shred_version", u16).ok(),
new_hard_forks: hardforks_of(&matches, "hard_forks"), new_hard_forks: hardforks_of(&matches, "hard_forks"),
rpc_config: JsonRpcConfig { rpc_config: JsonRpcConfig {
enable_validator_exit: matches.is_present("enable_rpc_exit"), enable_validator_exit: matches.is_present("enable_rpc_exit"),
@ -866,8 +888,12 @@ pub fn main() {
); );
if !no_genesis_fetch { if !no_genesis_fetch {
let (rpc_client, rpc_addr) = let (rpc_client, rpc_addr) = get_rpc_addr(
get_rpc_addr(&node, &identity_keypair, &cluster_entrypoint.gossip); &node,
&identity_keypair,
&cluster_entrypoint.gossip,
validator_config.expected_shred_version,
);
download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| { download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| {
error!("Failed to initialize ledger: {}", err); error!("Failed to initialize ledger: {}", err);