Rework fullnode boot sequence

This commit is contained in:
Michael Vines 2019-01-19 20:03:20 -08:00
parent 514bf32b99
commit 5b73a8eceb
2 changed files with 95 additions and 125 deletions

View File

@ -1,14 +1,10 @@
extern crate serde_json;
use clap::{crate_version, App, Arg, ArgMatches}; use clap::{crate_version, App, Arg, ArgMatches};
use log::*; use log::*;
use solana::client::mk_client; use solana::client::mk_client;
use solana::cluster_info::{Node, NodeInfo, FULLNODE_PORT_RANGE}; use solana::cluster_info::{Node, NodeInfo, FULLNODE_PORT_RANGE};
use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::fullnode::Fullnode;
use solana::leader_scheduler::LeaderScheduler; use solana::leader_scheduler::LeaderScheduler;
use solana::local_vote_signer_service::LocalVoteSignerService; use solana::local_vote_signer_service::LocalVoteSignerService;
use solana::service::Service;
use solana::socketaddr; use solana::socketaddr;
use solana::thin_client::{poll_gossip_for_leader, ThinClient}; use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::vote_signer_proxy::{RemoteVoteSigner, VoteSignerProxy}; use solana::vote_signer_proxy::{RemoteVoteSigner, VoteSignerProxy};
@ -58,10 +54,9 @@ fn create_and_fund_vote_account(
node_keypair: &Arc<Keypair>, node_keypair: &Arc<Keypair>,
) -> Result<()> { ) -> Result<()> {
let pubkey = node_keypair.pubkey(); let pubkey = node_keypair.pubkey();
let balance = client.poll_get_balance(&pubkey).unwrap_or(0); let node_balance = client.poll_get_balance(&pubkey)?;
info!("balance is {}", balance); info!("node balance is {}", node_balance);
if balance < 1 { if node_balance < 1 {
error!("insufficient tokens, one token required");
return Err(Error::new( return Err(Error::new(
ErrorKind::Other, ErrorKind::Other,
"insufficient tokens, one token required", "insufficient tokens, one token required",
@ -71,7 +66,7 @@ fn create_and_fund_vote_account(
// Create the vote account if necessary // Create the vote account if necessary
if client.poll_get_balance(&vote_account).unwrap_or(0) == 0 { if client.poll_get_balance(&vote_account).unwrap_or(0) == 0 {
// Need at least two tokens as one token will be spent on a vote_account_new() transaction // Need at least two tokens as one token will be spent on a vote_account_new() transaction
if balance < 2 { if node_balance < 2 {
error!("insufficient tokens, two tokens required"); error!("insufficient tokens, two tokens required");
return Err(Error::new( return Err(Error::new(
ErrorKind::Other, ErrorKind::Other,
@ -82,56 +77,45 @@ fn create_and_fund_vote_account(
let last_id = client.get_last_id(); let last_id = client.get_last_id();
let transaction = let transaction =
VoteTransaction::vote_account_new(node_keypair, vote_account, last_id, 1, 1); VoteTransaction::vote_account_new(node_keypair, vote_account, last_id, 1, 1);
if client.transfer_signed(&transaction).is_err() {
sleep(Duration::from_secs(2));
continue;
}
let balance = client.poll_get_balance(&vote_account).unwrap_or(0); match client.transfer_signed(&transaction) {
if balance > 0 { Ok(_) => match client.poll_get_balance(&vote_account) {
break; Ok(balance) => {
} info!("vote account balance: {}", balance);
break;
}
Err(e) => {
info!("Failed to get vote account balance: {:?}", e);
}
},
Err(e) => {
info!("Failed to send vote_account_new transaction: {:?}", e);
}
};
sleep(Duration::from_secs(2)); sleep(Duration::from_secs(2));
} }
} }
Ok(())
}
fn wait_for_vote_account_registeration( debug!("Checking for vote account registration");
client: &mut ThinClient, let vote_account_user_data = client.get_account_userdata(&vote_account);
vote_account: &Pubkey, if let Ok(Some(vote_account_user_data)) = vote_account_user_data {
node_id: Pubkey, if let Ok(vote_state) = VoteProgram::deserialize(&vote_account_user_data) {
) { if vote_state.node_id == pubkey {
loop { return Ok(());
let vote_account_user_data = client.get_account_userdata(vote_account);
if let Ok(Some(vote_account_user_data)) = vote_account_user_data {
if let Ok(vote_state) = VoteProgram::deserialize(&vote_account_user_data) {
if vote_state.node_id == node_id {
break;
}
}
}
panic!("Expected successful vote account registration");
}
}
fn run_forever_and_do_role_transition(fullnode: &mut Fullnode) {
loop {
let status = fullnode.handle_role_transition();
match status {
Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)) => (),
Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) => (),
_ => {
// Fullnode tpu/tvu exited for some unexpected reason
return;
} }
} }
} }
Err(Error::new(
ErrorKind::Other,
"expected successful vote account registration",
))
} }
fn main() { fn main() {
solana_logger::setup(); solana_logger::setup();
solana_metrics::set_panic_hook("fullnode"); solana_metrics::set_panic_hook("fullnode");
let matches = App::new("fullnode") let matches = App::new("fullnode")
.version(crate_version!()) .version(crate_version!())
.arg( .arg(
@ -159,7 +143,7 @@ fn main() {
.long("network") .long("network")
.value_name("HOST:PORT") .value_name("HOST:PORT")
.takes_value(true) .takes_value(true)
.help("Rendezvous with the network at this gossip entry point"), .help("Rendezvous with the cluster at this gossip entry point"),
) )
.arg( .arg(
Arg::with_name("signer") Arg::with_name("signer")
@ -187,69 +171,49 @@ fn main() {
) )
.get_matches(); .get_matches();
let nosigverify = matches.is_present("nosigverify"); let no_sigverify = matches.is_present("nosigverify");
let use_only_bootstrap_leader = matches.is_present("no-leader-rotation"); let use_only_bootstrap_leader = matches.is_present("no-leader-rotation");
let (keypair, gossip) = parse_identity(&matches); let (keypair, gossip) = parse_identity(&matches);
let ledger_path = matches.value_of("ledger").unwrap(); let ledger_path = matches.value_of("ledger").unwrap();
let cluster_entrypoint = matches
// socketaddr that is initial pointer into the network's gossip
let network = matches
.value_of("network") .value_of("network")
.map(|network| network.parse().expect("failed to parse network address")); .map(|network| network.parse().expect("failed to parse network address"));
let (_signer_service, signer_addr) = if let Some(signer_addr) = matches.value_of("signer") {
let node = Node::new_with_external_ip(keypair.pubkey(), &gossip); (
None,
let keypair = Arc::new(keypair); signer_addr.to_string().parse().expect("Signer IP Address"),
let pubkey = keypair.pubkey(); )
} else {
let mut leader_scheduler = LeaderScheduler::default(); // Run a local vote signer if a vote signer service address was not provided
leader_scheduler.use_only_bootstrap_leader = use_only_bootstrap_leader; let (signer_service, signer_addr) = LocalVoteSignerService::new();
(Some(signer_service), signer_addr)
};
let rpc_port = if let Some(port) = matches.value_of("rpc_port") { let rpc_port = if let Some(port) = matches.value_of("rpc_port") {
let port_number = port.to_string().parse().expect("integer"); let port_number = port.to_string().parse().expect("integer");
if port_number == 0 { if port_number == 0 {
eprintln!("Invalid RPC port requested: {:?}", port); eprintln!("Invalid RPC port requested: {:?}", port);
exit(1); exit(1);
} }
Some(port_number) port_number
} else { } else {
match solana_netutil::find_available_port_in_range(FULLNODE_PORT_RANGE) { solana_netutil::find_available_port_in_range(FULLNODE_PORT_RANGE)
Ok(port) => Some(port), .expect("unable to allocate rpc port")
Err(_) => None,
}
}; };
let leader = match network { let keypair = Arc::new(keypair);
Some(network) => { let node = Node::new_with_external_ip(keypair.pubkey(), &gossip);
poll_gossip_for_leader(network, None).expect("can't find leader on network") let mut node_info = node.info.clone();
} node_info.rpc.set_port(rpc_port);
None => { node_info.rpc_pubsub.set_port(rpc_port + 1);
//self = leader
let mut node_info = node.info.clone();
if rpc_port.is_some() {
node_info.rpc.set_port(rpc_port.unwrap());
node_info.rpc_pubsub.set_port(rpc_port.unwrap() + 1);
}
node_info
}
};
let (signer_service, signer) = if let Some(signer_addr) = matches.value_of("signer") { let mut leader_scheduler = LeaderScheduler::default();
( leader_scheduler.use_only_bootstrap_leader = use_only_bootstrap_leader;
None,
signer_addr.to_string().parse().expect("Signer IP Address"),
)
} else {
// If a remote vote-signer service is not provided, run a local instance
let (signer_service, addr) = LocalVoteSignerService::new();
(Some(signer_service), addr)
};
let mut client = mk_client(&leader); let vote_signer = VoteSignerProxy::new(&keypair, Box::new(RemoteVoteSigner::new(signer_addr)));
let vote_signer = VoteSignerProxy::new(&keypair, Box::new(RemoteVoteSigner::new(signer)));
let vote_account = vote_signer.vote_account; let vote_account = vote_signer.vote_account;
info!("Node ID: {}", node.info.id);
info!("Signer service address: {:?}", signer_addr);
info!("New vote account ID is {:?}", vote_account); info!("New vote account ID is {:?}", vote_account);
let mut fullnode = Fullnode::new( let mut fullnode = Fullnode::new(
@ -257,25 +221,44 @@ fn main() {
ledger_path, ledger_path,
keypair.clone(), keypair.clone(),
Arc::new(vote_signer), Arc::new(vote_signer),
network, cluster_entrypoint,
nosigverify, no_sigverify,
leader_scheduler, leader_scheduler,
rpc_port, Some(rpc_port),
); );
if create_and_fund_vote_account(&mut client, vote_account, &keypair).is_err() { {
if let Some(signer_service) = signer_service { let leader_node_info = loop {
signer_service.join().unwrap(); info!("Looking for leader...");
match poll_gossip_for_leader(node_info.gossip, Some(10)) {
Ok(leader_node_info) => {
info!("Found leader: {:?}", leader_node_info);
break leader_node_info;
}
Err(err) => {
info!("Unable to find leader: {:?}", err);
}
};
};
let mut client = mk_client(&leader_node_info);
if let Err(err) = create_and_fund_vote_account(&mut client, vote_account, &keypair) {
panic!("Failed to create_and_fund_vote_account: {:?}", err);
} }
exit(1);
} }
wait_for_vote_account_registeration(&mut client, &vote_account, pubkey); loop {
run_forever_and_do_role_transition(&mut fullnode); let status = fullnode.handle_role_transition();
match status {
if let Some(signer_service) = signer_service { Ok(Some(transition)) => {
signer_service.join().unwrap(); info!("role_transition complete: {:?}", transition);
}
_ => {
panic!(
"Fullnode TPU/TVU exited for some unexpected reason: {:?}",
status
);
}
};
} }
exit(1);
} }

View File

@ -151,7 +151,6 @@ impl Fullnode {
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
info!("creating bank..."); info!("creating bank...");
let db_ledger = Self::make_db_ledger(ledger_path); let db_ledger = Self::make_db_ledger(ledger_path);
let (bank, entry_height, last_entry_id) = let (bank, entry_height, last_entry_id) =
Self::new_bank_from_db_ledger(&db_ledger, leader_scheduler); Self::new_bank_from_db_ledger(&db_ledger, leader_scheduler);
@ -167,9 +166,11 @@ impl Fullnode {
if let Some(port) = rpc_port { if let Some(port) = rpc_port {
rpc_addr.set_port(port); rpc_addr.set_port(port);
} }
info!("node rpc address: {}", rpc_addr);
info!("node leader_addr: {:?}", leader_addr);
let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i)); let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i));
let server = Self::new_with_bank( Self::new_with_bank(
keypair, keypair,
vote_signer, vote_signer,
bank, bank,
@ -182,21 +183,7 @@ impl Fullnode {
sigverify_disabled, sigverify_disabled,
rpc_port, rpc_port,
storage_rotate_count, storage_rotate_count,
); )
match leader_addr {
Some(leader_addr) => {
info!(
"validator ready... rpc address: {}, connected to: {}",
rpc_addr, leader_addr
);
}
None => {
info!("leader ready... rpc address: {}", rpc_addr);
}
}
server
} }
/// Create a fullnode instance acting as a leader or validator. /// Create a fullnode instance acting as a leader or validator.