This commit is contained in:
Michael Vines 2020-02-23 18:39:36 -07:00
parent 394933e53c
commit 5c5a06198c
1 changed files with 136 additions and 114 deletions

View File

@ -197,14 +197,65 @@ fn download_tar_bz2(
Ok(())
}
fn get_rpc_addr(
fn get_shred_rpc_peers(
cluster_info: &Arc<RwLock<ClusterInfo>>,
expected_shred_version: Option<u16>,
) -> Vec<ContactInfo> {
let rpc_peers = cluster_info.read().unwrap().all_rpc_peers();
match expected_shred_version {
Some(expected_shred_version) => {
// Filter out rpc peers that don't match the expected shred version
rpc_peers
.into_iter()
.filter(|contact_info| contact_info.shred_version == expected_shred_version)
.collect::<Vec<_>>()
}
None => {
if !rpc_peers
.iter()
.all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version)
{
eprintln!(
"Multiple shred versions observed in gossip. Restart with --expected-shred-version"
);
exit(1);
}
rpc_peers
}
}
}
fn get_trusted_snapshot_hashes(
cluster_info: &Arc<RwLock<ClusterInfo>>,
trusted_validators: Option<&HashSet<Pubkey>>,
) -> Option<HashSet<(Slot, Hash)>> {
if let Some(trusted_validators) = trusted_validators {
let mut trusted_snapshot_hashes = HashSet::new();
for trusted_validator in trusted_validators {
if let Some(snapshot_hashes) = cluster_info
.read()
.unwrap()
.get_snapshot_hash_for_node(trusted_validator)
{
for snapshot_hash in snapshot_hashes {
trusted_snapshot_hashes.insert(*snapshot_hash);
}
}
}
Some(trusted_snapshot_hashes)
} else {
None
}
}
fn get_rpc_node(
node: &Node,
identity_keypair: &Arc<Keypair>,
entrypoint_gossip: &SocketAddr,
expected_shred_version: Option<u16>,
trusted_validators: Option<&HashSet<Pubkey>>,
snapshot_not_required: bool,
) -> (RpcClient, SocketAddr) {
) -> (ContactInfo, RpcClient, Option<(Slot, Hash)>) {
let mut cluster_info = ClusterInfo::new(
ClusterInfo::spy_contact_info(&identity_keypair.pubkey()),
identity_keypair.clone(),
@ -220,140 +271,109 @@ fn get_rpc_addr(
&gossip_exit_flag,
);
let (rpc_client, rpc_addr) = loop {
let (rpc_contact_info, rpc_client, selected_snapshot_hash) = loop {
info!(
"Searching for an RPC service, shred version={:?}...\n{}",
expected_shred_version,
cluster_info.read().unwrap().contact_info_trace()
"Searching for an RPC service, shred version={:?}...",
expected_shred_version
);
sleep(Duration::from_secs(1));
info!("\n{}", cluster_info.read().unwrap().contact_info_trace());
let mut rpc_peers = cluster_info.read().unwrap().all_rpc_peers();
match expected_shred_version {
Some(expected_shred_version) => {
// Filter out rpc peers that don't match the expected shred version
rpc_peers = rpc_peers
.into_iter()
.filter(|contact_info| contact_info.shred_version == expected_shred_version)
.collect::<Vec<_>>();
}
None => {
if !rpc_peers
.iter()
.all(|contact_info| contact_info.shred_version == rpc_peers[0].shred_version)
{
eprintln!(
"Multiple shred versions observed in gossip. Restart with --expected-shred-version"
);
exit(1);
}
}
let rpc_peers = get_shred_rpc_peers(&cluster_info, expected_shred_version);
if rpc_peers.is_empty() {
info!("No RPC services found");
continue;
}
let trusted_slots = if let Some(trusted_validators) = trusted_validators {
let mut trusted_slots = HashSet::new();
for trusted_validator in trusted_validators {
let mut highest_snapshot_hash: Option<(Slot, Hash)> = None;
let eligible_rpc_peers = if snapshot_not_required {
rpc_peers
} else {
let trusted_snapshot_hashes =
get_trusted_snapshot_hashes(&cluster_info, trusted_validators);
let mut eligible_rpc_peers = vec![];
for rpc_peer in rpc_peers.iter() {
if let Some(snapshot_hashes) = cluster_info
.read()
.unwrap()
.get_snapshot_hash_for_node(trusted_validator)
.get_snapshot_hash_for_node(&rpc_peer.id)
{
for snapshot_hash in snapshot_hashes {
trusted_slots.insert(*snapshot_hash);
}
}
}
Some(trusted_slots)
} else {
None
};
if rpc_peers.is_empty() {
info!("No RPC services found ");
} else {
let eligible_rpc_peers = if snapshot_not_required {
rpc_peers
} else {
let mut eligible_rpc_peers = vec![];
let mut highest_snapshot_slot = 0;
for rpc_peer in rpc_peers.iter() {
if let Some(snapshot_hash) = cluster_info
.read()
.unwrap()
.get_snapshot_hash_for_node(&rpc_peer.id)
{
let highest_snapshot_slot_for_node =
snapshot_hash.iter().fold(0, |highest_slot, snapshot_hash| {
if let Some(ref trusted_slots) = trusted_slots {
if !trusted_slots.contains(snapshot_hash) {
// Ignore all untrusted slots
return highest_slot;
}
}
highest_slot.max(snapshot_hash.0)
});
if highest_snapshot_slot_for_node > 0 {
if highest_snapshot_slot_for_node > highest_snapshot_slot {
// Found a higher snapshot, remove all rpc peers with a lower snapshot
eligible_rpc_peers.clear();
highest_snapshot_slot = highest_snapshot_slot_for_node;
if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes {
if !trusted_snapshot_hashes.contains(snapshot_hash) {
// Ignore all untrusted snapshot hashes
continue;
}
}
if highest_snapshot_slot_for_node == highest_snapshot_slot {
eligible_rpc_peers.push(rpc_peer.clone());
}
if highest_snapshot_hash.is_none()
|| snapshot_hash.0 > highest_snapshot_hash.unwrap().0
{
// Found a higher snapshot, remove all nodes with a lower snapshot
eligible_rpc_peers.clear();
highest_snapshot_hash = Some(*snapshot_hash)
}
if Some(*snapshot_hash) == highest_snapshot_hash {
eligible_rpc_peers.push(rpc_peer.clone());
}
}
}
}
if highest_snapshot_slot == 0 {
match highest_snapshot_hash {
None => {
assert!(eligible_rpc_peers.is_empty());
info!("No snapshot available");
} else {
info!("No snapshots available");
}
Some(highest_snapshot_hash) => {
info!(
"Highest available snapshot slot is {}",
highest_snapshot_slot
"Highest available snapshot slot is {}, available from {:?}",
highest_snapshot_hash.0,
eligible_rpc_peers
.iter()
.map(|contact_info| contact_info.id)
.collect::<Vec<_>>()
);
}
eligible_rpc_peers
}
eligible_rpc_peers
};
if !eligible_rpc_peers.is_empty() {
// Prefer the entrypoint's RPC service if present, otherwise pick one at random
let contact_info = if let Some(contact_info) = eligible_rpc_peers
.iter()
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
{
contact_info
} else {
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]
};
if !eligible_rpc_peers.is_empty() {
// Prefer the entrypoint's RPC service if present, otherwise pick one at random
let contact_info = if let Some(contact_info) = eligible_rpc_peers
.iter()
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
{
contact_info
} else {
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())]
};
info!(
"Trying RPC service from node {}: {:?}",
contact_info.id, contact_info.rpc
);
let rpc_client = RpcClient::new_socket(contact_info.rpc);
match rpc_client.get_version() {
Ok(rpc_version) => {
info!("RPC node version: {}", rpc_version.solana_core);
break (rpc_client, contact_info.rpc);
}
Err(err) => {
warn!("Failed to get RPC node's version: {}", err);
}
info!(
"Trying RPC service from node {}: {:?}",
contact_info.id, contact_info.rpc
);
let rpc_client = RpcClient::new_socket(contact_info.rpc);
match rpc_client.get_version() {
Ok(rpc_version) => {
info!("RPC node version: {}", rpc_version.solana_core);
break (contact_info.clone(), rpc_client, highest_snapshot_hash);
}
Err(err) => {
warn!("Failed to get RPC node's version: {}", err);
}
}
}
sleep(Duration::from_secs(1));
};
gossip_exit_flag.store(true, Ordering::Relaxed);
gossip_service.join().unwrap();
(rpc_client, rpc_addr)
(rpc_contact_info, rpc_client, selected_snapshot_hash)
}
fn check_vote_account(
@ -409,11 +429,11 @@ fn check_vote_account(
fn download_ledger(
rpc_addr: &SocketAddr,
ledger_path: &Path,
no_snapshot_fetch: bool,
snapshot_hash: Option<(Slot, Hash)>,
) -> Result<(), String> {
download_tar_bz2(rpc_addr, "genesis.tar.bz2", ledger_path, false)?;
if !no_snapshot_fetch {
if snapshot_hash.is_some() {
let snapshot_package =
solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path);
if snapshot_package.exists() {
@ -998,7 +1018,7 @@ pub fn main() {
);
if !no_genesis_fetch {
let (rpc_client, rpc_addr) = get_rpc_addr(
let (rpc_contact_info, rpc_client, snapshot_hash) = get_rpc_node(
&node,
&identity_keypair,
&cluster_entrypoint.gossip,
@ -1012,11 +1032,6 @@ pub fn main() {
no_snapshot_fetch,
);
download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| {
error!("Failed to initialize ledger: {}", err);
exit(1);
});
let genesis_hash = rpc_client.get_genesis_hash().unwrap_or_else(|err| {
error!("Failed to get genesis hash: {}", err);
exit(1);
@ -1045,6 +1060,13 @@ pub fn main() {
exit(1);
});
}
download_ledger(&rpc_contact_info.rpc, &ledger_path, snapshot_hash).unwrap_or_else(
|err| {
error!("Failed to initialize ledger: {}", err);
exit(1);
},
);
}
}