Shutdown all services before bailing replicator init (#4487)

automerge
This commit is contained in:
Sagar Dhawan 2019-05-30 14:36:47 -07:00 committed by Grimes
parent 045d4d5294
commit b8aff218e2
1 changed files with 38 additions and 21 deletions

View File

@ -7,7 +7,7 @@ use crate::contact_info::ContactInfo;
use crate::gossip_service::GossipService; use crate::gossip_service::GossipService;
use crate::packet::to_shared_blob; use crate::packet::to_shared_blob;
use crate::repair_service::{RepairSlotRange, RepairStrategy}; use crate::repair_service::{RepairSlotRange, RepairStrategy};
use crate::result::Result; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
use crate::streamer::{receiver, responder}; use crate::streamer::{receiver, responder};
use crate::window_service::WindowService; use crate::window_service::WindowService;
@ -31,7 +31,7 @@ use solana_sdk::transaction::Transaction;
use solana_sdk::transport::TransportError; use solana_sdk::transport::TransportError;
use solana_storage_api::{get_segment_from_slot, storage_instruction, SLOTS_PER_SEGMENT}; use solana_storage_api::{get_segment_from_slot, storage_instruction, SLOTS_PER_SEGMENT};
use std::fs::File; use std::fs::File;
use std::io::{self, BufReader, Error, ErrorKind, Read, Seek, SeekFrom}; use std::io::{self, BufReader, ErrorKind, Read, Seek, SeekFrom};
use std::mem::size_of; use std::mem::size_of;
use std::net::{SocketAddr, UdpSocket}; use std::net::{SocketAddr, UdpSocket};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -82,11 +82,11 @@ pub(crate) fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result<
let file_len = metadata.len(); let file_len = metadata.len();
if file_len < sample_size64 { if file_len < sample_size64 {
return Err(Error::new(ErrorKind::Other, "file too short!")); return Err(io::Error::new(ErrorKind::Other, "file too short!"));
} }
for offset in sample_offsets { for offset in sample_offsets {
if *offset > (file_len - sample_size64) / sample_size64 { if *offset > (file_len - sample_size64) / sample_size64 {
return Err(Error::new(ErrorKind::Other, "offset too large")); return Err(io::Error::new(ErrorKind::Other, "offset too large"));
} }
buffer_file.seek(SeekFrom::Start(*offset * sample_size64))?; buffer_file.seek(SeekFrom::Start(*offset * sample_size64))?;
trace!("sampling @ {} ", *offset); trace!("sampling @ {} ", *offset);
@ -199,10 +199,28 @@ impl Replicator {
); );
info!("Connecting to the cluster via {:?}", cluster_entrypoint); info!("Connecting to the cluster via {:?}", cluster_entrypoint);
let (nodes, _) = crate::gossip_service::discover_cluster(&cluster_entrypoint.gossip, 1)?; let (nodes, _) =
match crate::gossip_service::discover_cluster(&cluster_entrypoint.gossip, 1) {
Ok(nodes_and_replicators) => nodes_and_replicators,
Err(e) => {
//shutdown services before exiting
exit.store(true, Ordering::Relaxed);
gossip_service.join()?;
return Err(Error::from(e));
}
};
let client = crate::gossip_service::get_client(&nodes); let client = crate::gossip_service::get_client(&nodes);
let (storage_blockhash, storage_slot) = Self::poll_for_blockhash_and_slot(&cluster_info)?; let (storage_blockhash, storage_slot) =
match Self::poll_for_blockhash_and_slot(&cluster_info) {
Ok(blockhash_and_slot) => blockhash_and_slot,
Err(e) => {
//shutdown services before exiting
exit.store(true, Ordering::Relaxed);
gossip_service.join()?;
return Err(e);
}
};
let signature = storage_keypair.sign(storage_blockhash.as_ref()); let signature = storage_keypair.sign(storage_blockhash.as_ref());
let slot = get_slot_from_blockhash(&signature, storage_slot); let slot = get_slot_from_blockhash(&signature, storage_slot);
@ -232,7 +250,14 @@ impl Replicator {
|_, _, _| true, |_, _, _| true,
); );
Self::setup_mining_account(&client, &keypair, &storage_keypair)?; if let Err(e) = Self::setup_mining_account(&client, &keypair, &storage_keypair) {
//shutdown services before exiting
exit.store(true, Ordering::Relaxed);
gossip_service.join()?;
window_service.join()?;
fetch_stage.join()?;
return Err(e);
};
let mut thread_handles = let mut thread_handles =
create_request_processor(node.sockets.storage.unwrap(), &exit, slot); create_request_processor(node.sockets.storage.unwrap(), &exit, slot);
@ -321,19 +346,11 @@ impl Replicator {
); );
let mut current_slot = start_slot; let mut current_slot = start_slot;
'outer: loop { 'outer: loop {
while let Ok(meta) = blocktree.meta(current_slot) { while blocktree.is_full(current_slot) {
if let Some(meta) = meta { current_slot += 1;
if meta.is_full() { info!("current slot: {}", current_slot);
current_slot += 1; if current_slot >= start_slot + SLOTS_PER_SEGMENT {
info!("current slot: {}", current_slot); break 'outer;
if current_slot >= start_slot + SLOTS_PER_SEGMENT {
break 'outer;
}
} else {
break;
}
} else {
break;
} }
} }
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
@ -518,7 +535,7 @@ impl Replicator {
info!("waiting for segment..."); info!("waiting for segment...");
sleep(Duration::from_secs(5)); sleep(Duration::from_secs(5));
} }
Err(Error::new( Err(io::Error::new(
ErrorKind::Other, ErrorKind::Other,
"Couldn't get blockhash or slot", "Couldn't get blockhash or slot",
))? ))?