solana/core/src/fullnode.rs

424 lines
14 KiB
Rust
Raw Normal View History

2018-07-02 15:24:40 -07:00
//! The `fullnode` module hosts all the fullnode microservices.
2019-02-20 17:05:57 -08:00
use crate::bank_forks::BankForks;
2019-02-21 18:46:04 -08:00
use crate::blocktree::Blocktree;
2019-02-20 17:05:57 -08:00
use crate::blocktree_processor::{self, BankForksInfo};
2018-12-07 19:16:27 -08:00
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::entry::create_ticks;
use crate::entry::next_entry_mut;
use crate::entry::Entry;
2018-12-07 19:16:27 -08:00
use crate::gossip_service::GossipService;
use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc::JsonRpcConfig;
use crate::rpc_pubsub_service::PubSubService;
2019-02-17 09:29:08 -08:00
use crate::rpc_service::JsonRpcService;
use crate::rpc_subscriptions::RpcSubscriptions;
2018-12-07 19:16:27 -08:00
use crate::service::Service;
use crate::storage_stage::StorageState;
2019-02-20 18:16:37 -08:00
use crate::tpu::Tpu;
use crate::tvu::{Sockets, Tvu};
use solana_metrics::counter::Counter;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
2018-12-03 10:26:28 -08:00
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use solana_sdk::timing::timestamp;
2019-03-02 13:51:26 -08:00
use solana_vote_api::vote_transaction::VoteTransaction;
2018-11-06 11:57:41 -08:00
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::sleep;
use std::thread::JoinHandle;
use std::thread::{spawn, Result};
use std::time::Duration;
2018-07-02 11:20:35 -07:00
pub struct FullnodeConfig {
pub sigverify_disabled: bool,
pub voting_disabled: bool,
2019-02-21 15:16:09 -08:00
pub blockstream: Option<String>,
pub storage_rotate_count: u64,
pub tick_config: PohServiceConfig,
2019-02-25 21:22:00 -08:00
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
}
impl Default for FullnodeConfig {
fn default() -> Self {
// TODO: remove this, temporary parameter to configure
// storage amount differently for test configurations
// so tests don't take forever to run.
const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024;
Self {
sigverify_disabled: false,
voting_disabled: false,
2019-02-21 15:16:09 -08:00
blockstream: None,
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
tick_config: PohServiceConfig::default(),
2019-02-25 21:22:00 -08:00
account_paths: None,
rpc_config: JsonRpcConfig::default(),
}
}
}
2018-08-09 14:29:07 -07:00
pub struct Fullnode {
pub id: Pubkey,
2018-07-09 13:53:18 -07:00
exit: Arc<AtomicBool>,
rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>,
rpc_working_bank_handle: JoinHandle<()>,
gossip_service: GossipService,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_service: PohService,
tpu: Tpu,
tvu: Tvu,
2018-07-02 15:24:40 -07:00
}
2018-07-02 11:20:35 -07:00
2018-08-09 14:29:07 -07:00
impl Fullnode {
2019-02-21 21:43:35 -08:00
pub fn new<T>(
mut node: Node,
keypair: &Arc<Keypair>,
ledger_path: &str,
2019-02-21 21:43:35 -08:00
voting_keypair: T,
entrypoint_info_option: Option<&NodeInfo>,
config: &FullnodeConfig,
2019-02-21 21:43:35 -08:00
) -> Self
where
T: 'static + KeypairUtil + Sync + Send,
{
info!("creating bank...");
let id = keypair.pubkey();
2019-02-06 19:21:31 -08:00
assert_eq!(id, node.info.id);
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
2019-02-25 21:22:00 -08:00
new_banks_from_blocktree(ledger_path, config.account_paths.clone());
2019-02-20 17:05:57 -08:00
let exit = Arc::new(AtomicBool::new(false));
let bank_info = &bank_forks_info[0];
2019-03-04 16:40:28 -08:00
let bank = bank_forks[bank_info.bank_slot].clone();
2019-03-02 10:20:10 -08:00
info!(
"starting PoH... {} {}",
bank.tick_height(),
2019-03-02 10:25:16 -08:00
bank.last_blockhash(),
2019-03-02 10:20:10 -08:00
);
let (poh_recorder, entry_receiver) =
PohRecorder::new(bank.tick_height(), bank.last_blockhash());
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
info!("node info: {:?}", node.info);
info!("node entrypoint_info: {:?}", entrypoint_info_option);
info!(
"node local gossip address: {}",
node.sockets.gossip.local_addr().unwrap()
);
2019-02-07 20:52:39 -08:00
let blocktree = Arc::new(blocktree);
2019-02-20 21:23:44 -08:00
let bank_forks = Arc::new(RwLock::new(bank_forks));
2018-08-14 17:03:48 -07:00
node.info.wallclock = timestamp();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair(
node.info.clone(),
keypair.clone(),
)));
let storage_state = StorageState::new();
let rpc_service = JsonRpcService::new(
&cluster_info,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), node.info.rpc.port()),
storage_state.clone(),
config.rpc_config.clone(),
2019-03-04 16:21:33 -08:00
&exit,
);
let subscriptions = Arc::new(RpcSubscriptions::default());
let rpc_pubsub_service = PubSubService::new(
&subscriptions,
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
node.info.rpc_pubsub.port(),
),
&exit,
);
let gossip_service = GossipService::new(
2018-10-08 19:55:54 -07:00
&cluster_info,
2019-02-07 20:52:39 -08:00
Some(blocktree.clone()),
2019-02-20 21:36:08 -08:00
Some(bank_forks.clone()),
node.sockets.gossip,
&exit,
);
// Insert the entrypoint info, should only be None if this node
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
// is the bootstrap leader
if let Some(entrypoint_info) = entrypoint_info_option {
cluster_info
.write()
.unwrap()
.insert_info(entrypoint_info.clone());
2018-08-22 17:51:53 -07:00
}
let sockets = Sockets {
repair: node
.sockets
.repair
.try_clone()
.expect("Failed to clone repair socket"),
retransmit: node
.sockets
.retransmit
.try_clone()
.expect("Failed to clone retransmit socket"),
fetch: node
.sockets
.tvu
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU Sockets"))
.collect(),
};
let voting_keypair_option = if config.voting_disabled {
None
} else {
Some(Arc::new(voting_keypair))
};
// Setup channel for rotation indications
let tvu = Tvu::new(
voting_keypair_option,
2019-02-21 11:19:45 -08:00
&bank_forks,
2019-02-21 11:37:48 -08:00
&bank_forks_info,
&cluster_info,
sockets,
2019-02-07 20:52:39 -08:00
blocktree.clone(),
config.storage_rotate_count,
&storage_state,
2019-02-21 15:16:09 -08:00
config.blockstream.as_ref(),
ledger_signal_receiver,
2019-02-18 18:08:54 -08:00
&subscriptions,
&poh_recorder,
&exit,
);
let tpu = Tpu::new(
id,
&cluster_info,
&poh_recorder,
entry_receiver,
node.sockets.tpu,
node.sockets.broadcast,
config.sigverify_disabled,
&blocktree,
&exit,
);
let exit_ = exit.clone();
let bank_forks_ = bank_forks.clone();
let rpc_service_rp = rpc_service.request_processor.clone();
let rpc_working_bank_handle = spawn(move || loop {
if exit_.load(Ordering::Relaxed) {
break;
}
let bank = bank_forks_.read().unwrap().working_bank();
trace!("rpc working bank {} {}", bank.slot(), bank.last_blockhash());
2019-03-06 08:28:18 -08:00
rpc_service_rp.write().unwrap().set_bank(&bank);
let timer = Duration::from_millis(100);
sleep(timer);
});
2019-02-21 11:37:48 -08:00
inc_new_counter_info!("fullnode-new", 1);
Self {
id,
gossip_service,
rpc_service: Some(rpc_service),
rpc_pubsub_service: Some(rpc_pubsub_service),
rpc_working_bank_handle,
tpu,
tvu,
exit,
poh_service,
poh_recorder,
}
}
// Used for notifying many nodes in parallel to exit
pub fn exit(&self) {
2018-07-16 22:22:29 -07:00
self.exit.store(true, Ordering::Relaxed);
2019-03-04 20:50:02 -08:00
// Need to force the poh_recorder to drop the WorkingBank,
// which contains the channel to BroadcastStage. This should be
// sufficient as long as no other rotations are happening that
// can cause the Tpu to restart a BankingStage and reset a
// WorkingBank in poh_recorder. It follows no other rotations can be
// in motion because exit()/close() are only called by the run() loop
// which is the sole initiator of rotations.
self.poh_recorder.lock().unwrap().clear_bank();
2018-07-16 22:22:29 -07:00
}
pub fn close(self) -> Result<()> {
2018-07-17 08:18:42 -07:00
self.exit();
2018-07-09 13:53:18 -07:00
self.join()
2018-07-02 15:24:40 -07:00
}
2019-02-06 19:47:55 -08:00
}
2019-02-21 11:37:48 -08:00
pub fn new_banks_from_blocktree(
blocktree_path: &str,
2019-02-25 21:22:00 -08:00
account_paths: Option<String>,
2019-02-20 17:05:57 -08:00
) -> (BankForks, Vec<BankForksInfo>, Blocktree, Receiver<bool>) {
2019-02-06 19:47:55 -08:00
let genesis_block =
GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block");
let (blocktree, ledger_signal_receiver) =
Blocktree::open_with_config_signal(blocktree_path, genesis_block.ticks_per_slot)
.expect("Expected to successfully open database ledger");
2019-02-20 17:05:57 -08:00
let (bank_forks, bank_forks_info) =
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed");
2019-02-06 19:47:55 -08:00
(
2019-02-20 17:05:57 -08:00
bank_forks,
bank_forks_info,
blocktree,
ledger_signal_receiver,
)
}
2018-08-09 14:29:07 -07:00
impl Service for Fullnode {
type JoinReturnType = ();
fn join(self) -> Result<()> {
self.poh_service.join()?;
drop(self.poh_recorder);
if let Some(rpc_service) = self.rpc_service {
rpc_service.join()?;
}
if let Some(rpc_pubsub_service) = self.rpc_pubsub_service {
rpc_pubsub_service.join()?;
}
self.rpc_working_bank_handle.join()?;
self.gossip_service.join()?;
self.tpu.join()?;
self.tvu.join()?;
Ok(())
}
}
// Create entries such the node identified by active_keypair will be added to the active set for
// leader selection, and append `num_ending_ticks` empty tick entries.
pub fn make_active_set_entries(
active_keypair: &Arc<Keypair>,
2019-03-05 16:58:52 -08:00
lamport_source: &Keypair,
stake: u64,
slot_to_vote_on: u64,
2019-03-02 10:25:16 -08:00
blockhash: &Hash,
num_ending_ticks: u64,
2019-03-05 07:52:53 -08:00
) -> (Vec<Entry>, Keypair) {
2019-03-05 16:58:52 -08:00
// 1) Assume the active_keypair node has no lamports staked
2019-03-02 10:20:10 -08:00
let transfer_tx = SystemTransaction::new_account(
2019-03-05 16:58:52 -08:00
&lamport_source,
2019-03-02 10:20:10 -08:00
active_keypair.pubkey(),
stake,
2019-03-02 10:25:16 -08:00
*blockhash,
2019-03-02 10:20:10 -08:00
0,
);
2019-03-02 10:25:16 -08:00
let mut last_entry_hash = *blockhash;
let transfer_entry = next_entry_mut(&mut last_entry_hash, 1, vec![transfer_tx]);
// 2) Create and register a vote account for active_keypair
2019-03-05 07:52:53 -08:00
let voting_keypair = Keypair::new();
let vote_account_id = voting_keypair.pubkey();
let new_vote_account_tx = VoteTransaction::new_account(
2019-03-01 11:46:44 -08:00
active_keypair,
vote_account_id,
*blockhash,
stake.saturating_sub(2),
1,
);
let new_vote_account_entry = next_entry_mut(&mut last_entry_hash, 1, vec![new_vote_account_tx]);
// 3) Create vote entry
let vote_tx = VoteTransaction::new_vote(&voting_keypair, slot_to_vote_on, *blockhash, 0);
let vote_entry = next_entry_mut(&mut last_entry_hash, 1, vec![vote_tx]);
// 4) Create `num_ending_ticks` empty ticks
let mut entries = vec![transfer_entry, new_vote_account_entry, vote_entry];
let empty_ticks = create_ticks(num_ending_ticks, last_entry_hash);
entries.extend(empty_ticks);
(entries, voting_keypair)
}
2018-07-02 15:24:40 -07:00
#[cfg(test)]
mod tests {
use super::*;
use crate::blocktree::create_new_tmp_ledger;
use std::fs::remove_dir_all;
2018-07-02 15:24:40 -07:00
#[test]
fn validator_exit() {
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let (genesis_block, _mint_keypair) =
GenesisBlock::new_with_leader(10_000, leader_keypair.pubkey(), 1000);
2019-03-02 10:25:16 -08:00
let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let validator = Fullnode::new(
validator_node,
&Arc::new(validator_keypair),
2019-01-24 12:04:04 -08:00
&validator_ledger_path,
2019-02-21 21:43:35 -08:00
Keypair::new(),
Some(&leader_node.info),
&FullnodeConfig::default(),
);
validator.close().unwrap();
remove_dir_all(validator_ledger_path).unwrap();
2018-07-02 11:20:35 -07:00
}
2018-07-17 08:18:42 -07:00
#[test]
fn validator_parallel_exit() {
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let mut ledger_paths = vec![];
let validators: Vec<Fullnode> = (0..2)
.map(|_| {
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let (genesis_block, _mint_keypair) =
GenesisBlock::new_with_leader(10_000, leader_keypair.pubkey(), 1000);
2019-03-02 10:25:16 -08:00
let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
ledger_paths.push(validator_ledger_path.clone());
Fullnode::new(
validator_node,
&Arc::new(validator_keypair),
2019-01-24 12:04:04 -08:00
&validator_ledger_path,
2019-02-21 21:43:35 -08:00
Keypair::new(),
Some(&leader_node.info),
&FullnodeConfig::default(),
)
})
.collect();
// Each validator can exit in parallel to speed many sequential calls to `join`
validators.iter().for_each(|v| v.exit());
// While join is called sequentially, the above exit call notified all the
// validators to exit from all their threads
validators.into_iter().for_each(|validator| {
validator.join().unwrap();
});
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
2018-07-17 08:18:42 -07:00
}
2018-07-02 11:20:35 -07:00
}