From b8aff218e24f951f12032f0787f284dcb6440606 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Thu, 30 May 2019 14:36:47 -0700 Subject: [PATCH] Shutdown all services before bailing replicator init (#4487) automerge --- core/src/replicator.rs | 59 +++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 49bf2f769c..f752a9d48e 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -7,7 +7,7 @@ use crate::contact_info::ContactInfo; use crate::gossip_service::GossipService; use crate::packet::to_shared_blob; use crate::repair_service::{RepairSlotRange, RepairStrategy}; -use crate::result::Result; +use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{receiver, responder}; use crate::window_service::WindowService; @@ -31,7 +31,7 @@ use solana_sdk::transaction::Transaction; use solana_sdk::transport::TransportError; use solana_storage_api::{get_segment_from_slot, storage_instruction, SLOTS_PER_SEGMENT}; use std::fs::File; -use std::io::{self, BufReader, Error, ErrorKind, Read, Seek, SeekFrom}; +use std::io::{self, BufReader, ErrorKind, Read, Seek, SeekFrom}; use std::mem::size_of; use std::net::{SocketAddr, UdpSocket}; use std::path::{Path, PathBuf}; @@ -82,11 +82,11 @@ pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result< let file_len = metadata.len(); if file_len < sample_size64 { - return Err(Error::new(ErrorKind::Other, "file too short!")); + return Err(io::Error::new(ErrorKind::Other, "file too short!")); } for offset in sample_offsets { if *offset > (file_len - sample_size64) / sample_size64 { - return Err(Error::new(ErrorKind::Other, "offset too large")); + return Err(io::Error::new(ErrorKind::Other, "offset too large")); } buffer_file.seek(SeekFrom::Start(*offset * sample_size64))?; trace!("sampling @ {} ", *offset); @@ -199,10 +199,28 @@ impl Replicator { ); info!("Connecting to the cluster via {:?}", cluster_entrypoint); - let (nodes, _) = crate::gossip_service::discover_cluster(&cluster_entrypoint.gossip, 1)?; + let (nodes, _) = + match crate::gossip_service::discover_cluster(&cluster_entrypoint.gossip, 1) { + Ok(nodes_and_replicators) => nodes_and_replicators, + Err(e) => { + //shutdown services before exiting + exit.store(true, Ordering::Relaxed); + gossip_service.join()?; + return Err(Error::from(e)); + } + }; let client = crate::gossip_service::get_client(&nodes); - let (storage_blockhash, storage_slot) = Self::poll_for_blockhash_and_slot(&cluster_info)?; + let (storage_blockhash, storage_slot) = + match Self::poll_for_blockhash_and_slot(&cluster_info) { + Ok(blockhash_and_slot) => blockhash_and_slot, + Err(e) => { + //shutdown services before exiting + exit.store(true, Ordering::Relaxed); + gossip_service.join()?; + return Err(e); + } + }; let signature = storage_keypair.sign(storage_blockhash.as_ref()); let slot = get_slot_from_blockhash(&signature, storage_slot); @@ -232,7 +250,14 @@ impl Replicator { |_, _, _| true, ); - Self::setup_mining_account(&client, &keypair, &storage_keypair)?; + if let Err(e) = Self::setup_mining_account(&client, &keypair, &storage_keypair) { + //shutdown services before exiting + exit.store(true, Ordering::Relaxed); + gossip_service.join()?; + window_service.join()?; + fetch_stage.join()?; + return Err(e); + }; let mut thread_handles = create_request_processor(node.sockets.storage.unwrap(), &exit, slot); @@ -321,19 +346,11 @@ impl Replicator { ); let mut current_slot = start_slot; 'outer: loop { - while let Ok(meta) = blocktree.meta(current_slot) { - if let Some(meta) = meta { - if meta.is_full() { - current_slot += 1; - info!("current slot: {}", current_slot); - if current_slot >= start_slot + SLOTS_PER_SEGMENT { - break 'outer; - } - } else { - break; - } - } else { - break; + while blocktree.is_full(current_slot) { + current_slot += 1; + info!("current slot: {}", current_slot); + if current_slot >= start_slot + SLOTS_PER_SEGMENT { + break 'outer; } } if exit.load(Ordering::Relaxed) { @@ -518,7 +535,7 @@ impl Replicator { info!("waiting for segment..."); sleep(Duration::from_secs(5)); } - Err(Error::new( + Err(io::Error::new( ErrorKind::Other, "Couldn't get blockhash or slot", ))?