From 2dbe8fc1a91056b633eab2029a414b05a7fbb17d Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 10 Jan 2019 09:21:38 -0800 Subject: [PATCH] Refactor vote signer code (#2368) * Refactor vote signer code * fixed test compilation errors * address clippy errors * fix missing macro_use * move macro use * review comments --- fullnode/src/main.rs | 95 +++++----- src/compute_leader_confirmation_service.rs | 41 ++-- src/create_vote_account.rs | 61 ------ src/fullnode.rs | 35 ++-- src/leader_scheduler.rs | 210 +++++++-------------- src/lib.rs | 4 +- src/local_vote_signer_service.rs | 44 +++++ src/replay_stage.rs | 76 +++----- src/result.rs | 8 +- src/vote_signer_proxy.rs | 143 ++++++++++++++ src/vote_stage.rs | 126 ------------- tests/multinode.rs | 132 ++++++------- tests/replicator.rs | 37 ++-- 13 files changed, 433 insertions(+), 579 deletions(-) delete mode 100644 src/create_vote_account.rs create mode 100644 src/local_vote_signer_service.rs create mode 100644 src/vote_signer_proxy.rs delete mode 100644 src/vote_stage.rs diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index c4cf47fe8c..bf2749426a 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -1,4 +1,3 @@ -#[macro_use] extern crate serde_json; use clap::{crate_version, App, Arg}; @@ -6,14 +5,14 @@ use log::*; use solana::client::mk_client; use solana::cluster_info::{Node, NodeInfo, FULLNODE_PORT_RANGE}; -use solana::create_vote_account; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::leader_scheduler::LeaderScheduler; -use solana::rpc_request::{RpcClient, RpcRequest}; +use solana::local_vote_signer_service::LocalVoteSignerService; +use solana::service::Service; use solana::socketaddr; use solana::thin_client::poll_gossip_for_leader; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana::vote_signer_proxy::VoteSignerProxy; +use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::vote_program::VoteProgram; use solana_sdk::vote_transaction::VoteTransaction; use std::fs::File; @@ -118,35 +117,11 @@ fn main() { .value_of("network") .map(|network| network.parse().expect("failed to parse network address")); - let (signer, t_signer, signer_exit) = if let Some(signer_addr) = matches.value_of("signer") { - ( - signer_addr.to_string().parse().expect("Signer IP Address"), - None, - None, - ) - } else { - // If a remote vote-signer service is not provided, run a local instance - let (signer, t_signer, signer_exit) = create_vote_account::local_vote_signer_service() - .expect("Failed to start vote signer service"); - (signer, Some(t_signer), Some(signer_exit)) - }; - let node = Node::new_with_external_ip(keypair.pubkey(), &gossip); // save off some stuff for airdrop let mut node_info = node.info.clone(); - let rpc_client = RpcClient::new_from_socket(signer); - let msg = "Registering a new node"; - let sig = Signature::new(&keypair.sign(msg.as_bytes()).as_ref()); - - let params = json!([keypair.pubkey(), sig, msg.as_bytes()]); - let resp = RpcRequest::RegisterNode - .retry_make_rpc_request(&rpc_client, 1, Some(params), 5) - .expect("Failed to register node"); - let vote_account_id: Pubkey = - serde_json::from_value(resp).expect("Invalid register node response"); - info!("Vote account ID is {:?}", vote_account_id); let keypair = Arc::new(keypair); let pubkey = keypair.pubkey(); @@ -157,11 +132,6 @@ fn main() { let port_number = port.to_string().parse().expect("integer"); if port_number == 0 { eprintln!("Invalid RPC port requested: {:?}", port); - if let Some(t) = t_signer { - if let Some(exit) = signer_exit { - create_vote_account::stop_local_vote_signer_service(t, &exit); - } - } exit(1); } Some(port_number) @@ -186,53 +156,74 @@ fn main() { } }; + let (signer_service, signer) = if let Some(signer_addr) = matches.value_of("signer") { + ( + None, + signer_addr.to_string().parse().expect("Signer IP Address"), + ) + } else { + // If a remote vote-signer service is not provided, run a local instance + let (signer_service, addr) = LocalVoteSignerService::new(); + (Some(signer_service), addr) + }; + + let mut client = mk_client(&leader); + let vote_signer = VoteSignerProxy::new(&keypair, signer); + info!("New vote account ID is {:?}", vote_signer.vote_account); + let mut fullnode = Fullnode::new( node, ledger_path, keypair.clone(), - &vote_account_id, + &vote_signer.vote_account, &signer, network, nosigverify, leader_scheduler, rpc_port, ); - let mut client = mk_client(&leader); let balance = client.poll_get_balance(&pubkey).unwrap_or(0); info!("balance is {}", balance); if balance < 1 { error!("insufficient tokens, one token required"); - if let Some(t) = t_signer { - if let Some(exit) = signer_exit { - create_vote_account::stop_local_vote_signer_service(t, &exit); - } + if let Some(signer_service) = signer_service { + signer_service.join().unwrap(); } exit(1); } // Create the vote account if necessary - if client.poll_get_balance(&vote_account_id).unwrap_or(0) == 0 { + if client + .poll_get_balance(&vote_signer.vote_account) + .unwrap_or(0) + == 0 + { // Need at least two tokens as one token will be spent on a vote_account_new() transaction if balance < 2 { error!("insufficient tokens, two tokens required"); - if let Some(t) = t_signer { - if let Some(exit) = signer_exit { - create_vote_account::stop_local_vote_signer_service(t, &exit); - } + if let Some(signer_service) = signer_service { + signer_service.join().unwrap(); } exit(1); } loop { let last_id = client.get_last_id(); - let transaction = - VoteTransaction::vote_account_new(&keypair, vote_account_id, last_id, 1, 1); + let transaction = VoteTransaction::vote_account_new( + &keypair, + vote_signer.vote_account, + last_id, + 1, + 1, + ); if client.transfer_signed(&transaction).is_err() { sleep(Duration::from_secs(2)); continue; } - let balance = client.poll_get_balance(&vote_account_id).unwrap_or(0); + let balance = client + .poll_get_balance(&vote_signer.vote_account) + .unwrap_or(0); if balance > 0 { break; } @@ -241,7 +232,7 @@ fn main() { } loop { - let vote_account_user_data = client.get_account_userdata(&vote_account_id); + let vote_account_user_data = client.get_account_userdata(&vote_signer.vote_account); if let Ok(Some(vote_account_user_data)) = vote_account_user_data { if let Ok(vote_state) = VoteProgram::deserialize(&vote_account_user_data) { if vote_state.node_id == pubkey { @@ -260,10 +251,8 @@ fn main() { _ => { // Fullnode tpu/tvu exited for some unexpected // reason, so exit - if let Some(t) = t_signer { - if let Some(exit) = signer_exit { - create_vote_account::stop_local_vote_signer_service(t, &exit); - } + if let Some(signer_service) = signer_service { + signer_service.join().unwrap(); } exit(1); } diff --git a/src/compute_leader_confirmation_service.rs b/src/compute_leader_confirmation_service.rs index 41da86a9fc..3eb66b90ab 100644 --- a/src/compute_leader_confirmation_service.rs +++ b/src/compute_leader_confirmation_service.rs @@ -157,11 +157,11 @@ impl Service for ComputeLeaderConfirmationService { pub mod tests { use crate::bank::Bank; use crate::compute_leader_confirmation_service::ComputeLeaderConfirmationService; - use crate::create_vote_account::*; + use crate::vote_signer_proxy::VoteSignerProxy; + use crate::local_vote_signer_service::LocalVoteSignerService; use crate::mint::Mint; - use crate::rpc_request::RpcClient; - use crate::vote_stage::create_new_signed_vote_transaction; + use crate::service::Service; use bincode::serialize; use solana_sdk::hash::hash; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -187,35 +187,28 @@ pub mod tests { }) .collect(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); + let (signer_service, addr) = LocalVoteSignerService::new(); // Create a total of 10 vote accounts, each will have a balance of 1 (after giving 1 to // their vote account), for a total staking pool of 10 tokens. let vote_accounts: Vec<_> = (0..10) .map(|i| { // Create new validator to vote - let validator_keypair = Keypair::new(); + let validator_keypair = Arc::new(Keypair::new()); let last_id = ids[i]; + let vote_signer = VoteSignerProxy::new(&validator_keypair, addr.clone()); // Give the validator some tokens bank.transfer(2, &mint.keypair(), validator_keypair.pubkey(), last_id) .unwrap(); - let vote_account = - create_vote_account(&validator_keypair, &bank, 1, last_id, &rpc_client) - .expect("Expected successful creation of account"); + vote_signer + .new_vote_account(&bank, 1, last_id) + .expect("Expected successful creation of account"); - let validator_keypair = Arc::new(validator_keypair); if i < 6 { - let vote_tx = create_new_signed_vote_transaction( - &last_id, - &validator_keypair, - (i + 1) as u64, - &vote_account, - &rpc_client, - ); + let vote_tx = vote_signer.new_signed_vote_transaction(&last_id, (i + 1) as u64); bank.process_transaction(&vote_tx).unwrap(); } - (vote_account, validator_keypair) + (vote_signer, validator_keypair) }) .collect(); @@ -229,14 +222,8 @@ pub mod tests { assert_eq!(bank.confirmation_time(), std::usize::MAX); // Get another validator to vote, so we now have 2/3 consensus - let vote_account = &vote_accounts[7].0; - let vote_tx = create_new_signed_vote_transaction( - &ids[6], - &vote_accounts[7].1, - 7, - &vote_account, - &rpc_client, - ); + let vote_signer = &vote_accounts[7].0; + let vote_tx = vote_signer.new_signed_vote_transaction(&ids[6], 7); bank.process_transaction(&vote_tx).unwrap(); ComputeLeaderConfirmationService::compute_confirmation( @@ -246,6 +233,6 @@ pub mod tests { ); assert!(bank.confirmation_time() != std::usize::MAX); assert!(last_confirmation_time > 0); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } } diff --git a/src/create_vote_account.rs b/src/create_vote_account.rs deleted file mode 100644 index 08600e0e66..0000000000 --- a/src/create_vote_account.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::bank::Bank; -use crate::cluster_info::FULLNODE_PORT_RANGE; -use crate::result::Result; -use crate::rpc_request::{RpcClient, RpcRequest}; -use solana_netutil::find_available_port_in_range; -use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; -use solana_sdk::transaction::Transaction; -use solana_sdk::vote_transaction::*; -use solana_vote_signer::rpc::VoteSignerRpcService; -use std::io; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::thread::{Builder, JoinHandle}; - -pub fn local_vote_signer_service() -> io::Result<(SocketAddr, JoinHandle<()>, Arc)> { - let addr = match find_available_port_in_range(FULLNODE_PORT_RANGE) { - Ok(port) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port), - Err(e) => return Err(e), - }; - let service_addr = addr; - let exit = Arc::new(AtomicBool::new(false)); - let thread_exit = exit.clone(); - let thread = Builder::new() - .name("solana-vote-signer".to_string()) - .spawn(move || { - let service = VoteSignerRpcService::new(service_addr, thread_exit); - service.join().unwrap(); - }) - .unwrap(); - Ok((addr, thread, exit)) -} - -pub fn stop_local_vote_signer_service(t: JoinHandle<()>, exit: &Arc) { - exit.store(true, Ordering::Relaxed); - t.join().unwrap(); -} - -pub fn create_vote_account( - node_keypair: &Keypair, - bank: &Bank, - num_tokens: u64, - last_id: Hash, - rpc_client: &RpcClient, -) -> Result { - let msg = "Registering a new node"; - let sig = Signature::new(&node_keypair.sign(msg.as_bytes()).as_ref()); - let params = json!([node_keypair.pubkey(), sig, msg.as_bytes()]); - let resp = RpcRequest::RegisterNode - .make_rpc_request(&rpc_client, 1, Some(params)) - .unwrap(); - let new_vote_account: Pubkey = serde_json::from_value(resp).unwrap(); - - // Create and register the new vote account - let tx = Transaction::vote_account_new(node_keypair, new_vote_account, last_id, num_tokens, 0); - bank.process_transaction(&tx)?; - - Ok(new_vote_account) -} diff --git a/src/fullnode.rs b/src/fullnode.rs index a3122d69d4..df6779f805 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -652,18 +652,17 @@ impl Service for Fullnode { mod tests { use crate::bank::Bank; use crate::cluster_info::Node; - use crate::create_vote_account::*; use crate::db_ledger::*; use crate::entry::make_consecutive_blobs; use crate::fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType}; use crate::leader_scheduler::{ make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; - use crate::rpc_request::{RpcClient, RpcRequest}; + use crate::local_vote_signer_service::LocalVoteSignerService; use crate::service::Service; use crate::streamer::responder; - use solana_sdk::pubkey::Pubkey; - use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; + use crate::vote_signer_proxy::VoteSignerProxy; + use solana_sdk::signature::{Keypair, KeypairUtil}; use std::cmp; use std::fs::remove_dir_all; use std::net::UdpSocket; @@ -861,7 +860,7 @@ mod tests { // Write the entries to the ledger that will cause leader rotation // after the bootstrap height - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (active_set_entries, validator_vote_account_id) = make_active_set_entries( &validator_keypair, signer, @@ -913,7 +912,8 @@ mod tests { { // Test that a node knows to transition to a validator based on parsing the ledger - let leader_vote_id = register_node(signer, bootstrap_leader_keypair.clone()); + let vote_signer = VoteSignerProxy::new(&bootstrap_leader_keypair, signer); + let leader_vote_id = vote_signer.vote_account.clone(); let bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, @@ -962,21 +962,7 @@ mod tests { DbLedger::destroy(&path).expect("Expected successful database destruction"); let _ignored = remove_dir_all(&path); } - stop_local_vote_signer_service(t_signer, &signer_exit); - } - - fn register_node(signer: SocketAddr, keypair: Arc) -> Pubkey { - let rpc_client = RpcClient::new_from_socket(signer); - - let msg = "Registering a new node"; - let sig = Signature::new(&keypair.sign(msg.as_bytes()).as_ref()); - - let params = json!([keypair.pubkey(), sig, msg.as_bytes()]); - let resp = RpcRequest::RegisterNode - .make_rpc_request(&rpc_client, 1, Some(params)) - .unwrap(); - let vote_account_id: Pubkey = serde_json::from_value(resp).unwrap(); - vote_account_id + signer_service.join().unwrap(); } #[test] @@ -1013,7 +999,7 @@ mod tests { // after the bootstrap height // // 2) A vote from the validator - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (active_set_entries, _validator_vote_account_id) = make_active_set_entries( &validator_keypair, signer, @@ -1056,7 +1042,8 @@ mod tests { ); let validator_keypair = Arc::new(validator_keypair); - let vote_id = register_node(signer, validator_keypair.clone()); + let vote_signer = VoteSignerProxy::new(&validator_keypair, signer); + let vote_id = vote_signer.vote_account.clone(); // Start the validator let mut validator = Fullnode::new( validator_node, @@ -1132,7 +1119,7 @@ mod tests { ); // Shut down - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); t_responder.join().expect("responder thread join"); validator.close().unwrap(); DbLedger::destroy(&validator_ledger_path) diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 71d918cea7..220c0ddc42 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -536,14 +536,14 @@ pub fn make_active_set_entries( #[cfg(test)] mod tests { use crate::bank::Bank; - use crate::create_vote_account::*; use crate::leader_scheduler::{ LeaderScheduler, LeaderSchedulerConfig, DEFAULT_BOOTSTRAP_HEIGHT, DEFAULT_LEADER_ROTATION_INTERVAL, DEFAULT_SEED_ROTATION_INTERVAL, }; + use crate::local_vote_signer_service::LocalVoteSignerService; use crate::mint::Mint; - use crate::rpc_request::RpcClient; - use crate::vote_stage::create_new_signed_vote_transaction; + use crate::service::Service; + use crate::vote_signer_proxy::VoteSignerProxy; use hashbrown::HashSet; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -559,16 +559,8 @@ mod tests { HashSet::from_iter(slice.iter().cloned()) } - fn push_vote( - keypair: &Arc, - vote_account: &Pubkey, - bank: &Bank, - height: u64, - last_id: Hash, - rpc_client: &RpcClient, - ) { - let new_vote_tx = - create_new_signed_vote_transaction(&last_id, keypair, height, vote_account, rpc_client); + fn push_vote(vote_signer: &VoteSignerProxy, bank: &Bank, height: u64, last_id: Hash) { + let new_vote_tx = vote_signer.new_signed_vote_transaction(&last_id, height); bank.process_transaction(&new_vote_tx).unwrap(); } @@ -607,11 +599,11 @@ mod tests { .last() .expect("Mint should not create empty genesis entries") .id; - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); + let (signer_service, signer) = LocalVoteSignerService::new(); for i in 0..num_validators { let new_validator = Keypair::new(); let new_pubkey = new_validator.pubkey(); + let vote_signer = VoteSignerProxy::new(&Arc::new(new_validator), signer.clone()); validators.push(new_pubkey); // Give the validator some tokens bank.transfer( @@ -623,24 +615,12 @@ mod tests { .unwrap(); // Create a vote account - let new_vote_account = create_vote_account( - &new_validator, - &bank, - num_vote_account_tokens as u64, - mint.last_id(), - &rpc_client, - ) - .unwrap(); + vote_signer + .new_vote_account(&bank, num_vote_account_tokens as u64, mint.last_id()) + .unwrap(); // Vote to make the validator part of the active set for the entire test // (we made the active_window_length large enough at the beginning of the test) - push_vote( - &Arc::new(new_validator), - &new_vote_account, - &bank, - 1, - mint.last_id(), - &rpc_client, - ); + push_vote(&vote_signer, &bank, 1, mint.last_id()); } // The scheduled leader during the bootstrapping period (assuming a seed + schedule @@ -717,7 +697,7 @@ mod tests { Some((current_leader, slot)) ); } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } #[test] @@ -737,8 +717,7 @@ mod tests { let start_height = 3; let num_old_ids = 20; let mut old_ids = HashSet::new(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); + let (signer_service, signer) = LocalVoteSignerService::new(); for _ in 0..num_old_ids { let new_keypair = Keypair::new(); let pk = new_keypair.pubkey(); @@ -749,18 +728,13 @@ mod tests { .unwrap(); // Create a vote account - let new_vote_account = - create_vote_account(&new_keypair, &bank, 1, mint.last_id(), &rpc_client).unwrap(); + let vote_signer = VoteSignerProxy::new(&Arc::new(new_keypair), signer); + vote_signer + .new_vote_account(&bank, 1 as u64, mint.last_id()) + .unwrap(); // Push a vote for the account - push_vote( - &Arc::new(new_keypair), - &new_vote_account, - &bank, - start_height, - mint.last_id(), - &rpc_client, - ); + push_vote(&vote_signer, &bank, start_height, mint.last_id()); } // Insert a bunch of votes at height "start_height + active_window_length" @@ -775,16 +749,16 @@ mod tests { .unwrap(); // Create a vote account - let new_vote_account = - create_vote_account(&new_keypair, &bank, 1, mint.last_id(), &rpc_client).unwrap(); + let vote_signer = VoteSignerProxy::new(&Arc::new(new_keypair), signer); + vote_signer + .new_vote_account(&bank, 1 as u64, mint.last_id()) + .unwrap(); push_vote( - &Arc::new(new_keypair), - &new_vote_account, + &vote_signer, &bank, start_height + active_window_length, mint.last_id(), - &rpc_client, ); } @@ -803,7 +777,7 @@ mod tests { let result = leader_scheduler.get_active_set(2 * active_window_length + start_height, &bank); assert!(result.is_empty()); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } #[test] @@ -1046,11 +1020,11 @@ mod tests { .last() .expect("Mint should not create empty genesis entries") .id; - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); + let (signer_service, signer) = LocalVoteSignerService::new(); for i in 0..num_validators { let new_validator = Keypair::new(); let new_pubkey = new_validator.pubkey(); + let vote_signer = VoteSignerProxy::new(&Arc::new(new_validator), signer); validators.push(new_pubkey); // Give the validator some tokens bank.transfer( @@ -1062,23 +1036,16 @@ mod tests { .unwrap(); // Create a vote account - let new_vote_account = create_vote_account( - &new_validator, - &bank, - num_vote_account_tokens as u64, - mint.last_id(), - &rpc_client, - ) - .unwrap(); + vote_signer + .new_vote_account(&bank, num_vote_account_tokens as u64, mint.last_id()) + .unwrap(); // Vote at height i * active_window_length for validator i push_vote( - &Arc::new(new_validator), - &new_vote_account, + &vote_signer, &bank, i * active_window_length + bootstrap_height, mint.last_id(), - &rpc_client, ); } @@ -1096,7 +1063,7 @@ mod tests { assert_eq!(vec![expected], *result); } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } #[test] @@ -1117,29 +1084,16 @@ mod tests { // window let initial_vote_height = 1; - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); + let (signer_service, signer) = LocalVoteSignerService::new(); + let vote_signer = VoteSignerProxy::new(&Arc::new(leader_keypair), signer); // Create a vote account - let new_vote_account = - create_vote_account(&leader_keypair, &bank, 1, mint.last_id(), &rpc_client).unwrap(); - let leader_keypair = Arc::new(leader_keypair); + vote_signer + .new_vote_account(&bank, 1 as u64, mint.last_id()) + .unwrap(); + // Vote twice - push_vote( - &leader_keypair, - &new_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - &rpc_client, - ); - push_vote( - &leader_keypair, - &new_vote_account, - &bank, - initial_vote_height + 1, - mint.last_id(), - &rpc_client, - ); + push_vote(&vote_signer, &bank, initial_vote_height, mint.last_id()); + push_vote(&vote_signer, &bank, initial_vote_height + 1, mint.last_id()); let result = leader_scheduler.get_active_set(initial_vote_height + active_window_length, &bank); @@ -1147,7 +1101,7 @@ mod tests { let result = leader_scheduler.get_active_set(initial_vote_height + active_window_length + 1, &bank); assert!(result.is_empty()); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } #[test] @@ -1268,23 +1222,17 @@ mod tests { // Create and add validator to the active set let validator_keypair = Keypair::new(); let validator_id = validator_keypair.pubkey(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); + let (signer_service, signer) = LocalVoteSignerService::new(); if add_validator { bank.transfer(5, &mint.keypair(), validator_id, last_id) .unwrap(); // Create a vote account - let new_vote_account = - create_vote_account(&validator_keypair, &bank, 1, mint.last_id(), &rpc_client) - .unwrap(); - push_vote( - &Arc::new(validator_keypair), - &new_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - &rpc_client, - ); + let vote_signer = VoteSignerProxy::new(&Arc::new(validator_keypair), signer); + vote_signer + .new_vote_account(&bank, 1 as u64, mint.last_id()) + .unwrap(); + + push_vote(&vote_signer, &bank, initial_vote_height, mint.last_id()); } // Make sure the bootstrap leader, not the validator, is picked again on next slot @@ -1311,24 +1259,13 @@ mod tests { .unwrap(); // Create a vote account - let new_vote_account = create_vote_account( - &bootstrap_leader_keypair, - &bank, - vote_account_tokens, - mint.last_id(), - &rpc_client, - ) - .unwrap(); + let vote_signer = VoteSignerProxy::new(&Arc::new(bootstrap_leader_keypair), signer); + vote_signer + .new_vote_account(&bank, vote_account_tokens as u64, mint.last_id()) + .unwrap(); // Add leader to the active set - push_vote( - &Arc::new(bootstrap_leader_keypair), - &new_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - &rpc_client, - ); + push_vote(&vote_signer, &bank, initial_vote_height, mint.last_id()); leader_scheduler.generate_schedule(bootstrap_height, &bank); @@ -1339,7 +1276,7 @@ mod tests { } else { assert!(leader_scheduler.leader_schedule[0] == bootstrap_leader_id); } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } #[test] @@ -1444,40 +1381,23 @@ mod tests { // Create a vote account for the validator bank.transfer(5, &mint.keypair(), validator_id, last_id) .unwrap(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); - let new_validator_vote_account = - create_vote_account(&validator_keypair, &bank, 1, mint.last_id(), &rpc_client).unwrap(); - push_vote( - &Arc::new(validator_keypair), - &new_validator_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - &rpc_client, - ); + let (signer_service, signer) = LocalVoteSignerService::new(); + let vote_signer = VoteSignerProxy::new(&Arc::new(validator_keypair), signer); + vote_signer + .new_vote_account(&bank, 1 as u64, mint.last_id()) + .unwrap(); + push_vote(&vote_signer, &bank, initial_vote_height, mint.last_id()); // Create a vote account for the leader bank.transfer(5, &mint.keypair(), bootstrap_leader_id, last_id) .unwrap(); - let new_leader_vote_account = create_vote_account( - &bootstrap_leader_keypair, - &bank, - 1, - mint.last_id(), - &rpc_client, - ) - .unwrap(); + let vote_signer = VoteSignerProxy::new(&Arc::new(bootstrap_leader_keypair), signer); + vote_signer + .new_vote_account(&bank, 1 as u64, mint.last_id()) + .unwrap(); // Add leader to the active set - push_vote( - &Arc::new(bootstrap_leader_keypair), - &new_leader_vote_account, - &bank, - initial_vote_height, - mint.last_id(), - &rpc_client, - ); + push_vote(&vote_signer, &bank, initial_vote_height, mint.last_id()); // Generate the schedule leader_scheduler.generate_schedule(bootstrap_height, &bank); @@ -1535,6 +1455,6 @@ mod tests { .max_height_for_leader(bootstrap_height + 2 * seed_rotation_interval + 1), None ); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index 57961e3b4d..853bb0fc72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,6 @@ pub mod crds_gossip_pull; pub mod crds_gossip_push; pub mod crds_traits_impls; pub mod crds_value; -pub mod create_vote_account; #[macro_use] pub mod contact_info; pub mod cluster_info; @@ -42,6 +41,7 @@ pub mod fetch_stage; pub mod fullnode; pub mod gossip_service; pub mod leader_scheduler; +pub mod local_vote_signer_service; pub mod mint; pub mod packet; pub mod poh; @@ -68,7 +68,7 @@ pub mod thin_client; pub mod tpu; pub mod tpu_forwarder; pub mod tvu; -pub mod vote_stage; +pub mod vote_signer_proxy; pub mod window; pub mod window_service; diff --git a/src/local_vote_signer_service.rs b/src/local_vote_signer_service.rs new file mode 100644 index 0000000000..93b75255e6 --- /dev/null +++ b/src/local_vote_signer_service.rs @@ -0,0 +1,44 @@ +//! The `local_vote_signer_service` can be started locally to sign fullnode votes + +use crate::cluster_info::FULLNODE_PORT_RANGE; +use crate::service::Service; +use solana_vote_signer::rpc::VoteSignerRpcService; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread::{self, Builder, JoinHandle}; + +pub struct LocalVoteSignerService { + thread: JoinHandle<()>, + exit: Arc, +} + +impl Service for LocalVoteSignerService { + type JoinReturnType = (); + + fn join(self) -> thread::Result<()> { + self.exit.store(true, Ordering::Relaxed); + self.thread.join() + } +} + +impl LocalVoteSignerService { + #[allow(clippy::new_ret_no_self)] + pub fn new() -> (Self, SocketAddr) { + let addr = match solana_netutil::find_available_port_in_range(FULLNODE_PORT_RANGE) { + Ok(port) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port), + Err(_e) => panic!("Failed to find an available port for local vote signer service"), + }; + let exit = Arc::new(AtomicBool::new(false)); + let thread_exit = exit.clone(); + let thread = Builder::new() + .name("solana-vote-signer".to_string()) + .spawn(move || { + let service = VoteSignerRpcService::new(addr, thread_exit); + service.join().unwrap(); + }) + .unwrap(); + + (Self { thread, exit }, addr) + } +} diff --git a/src/replay_stage.rs b/src/replay_stage.rs index 1a4d34abaf..bef38ad652 100644 --- a/src/replay_stage.rs +++ b/src/replay_stage.rs @@ -9,10 +9,9 @@ use solana_sdk::hash::Hash; use crate::entry::EntrySlice; use crate::packet::BlobError; use crate::result::{Error, Result}; -use crate::rpc_request::RpcClient; use crate::service::Service; use crate::streamer::{responder, BlobSender}; -use crate::vote_stage::send_validator_vote; +use crate::vote_signer_proxy::VoteSignerProxy; use log::Level; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; @@ -66,8 +65,8 @@ impl ReplayStage { cluster_info: &Arc>, window_receiver: &EntryReceiver, keypair: &Arc, - vote_account_id: &Pubkey, - vote_signer_rpc: &RpcClient, + _vote_account_id: &Pubkey, + vote_signer: &VoteSignerProxy, vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, entry_height: &mut u64, @@ -141,15 +140,9 @@ impl ReplayStage { if 0 == num_ticks_to_next_vote { if let Some(sender) = vote_blob_sender { - send_validator_vote( - bank, - &keypair, - &vote_account_id, - vote_signer_rpc, - &cluster_info, - sender, - ) - .unwrap(); + vote_signer + .send_validator_vote(bank, &cluster_info, sender) + .unwrap(); } } let (scheduled_leader, _) = bank @@ -219,14 +212,14 @@ impl ReplayStage { let keypair = Arc::new(keypair); let vote_account_id = *vote_account_id; - - let rpc_client = RpcClient::new_from_socket(*vote_signer_addr); + let vote_signer_addr = *vote_signer_addr; let t_replay = Builder::new() .name("solana-replay-stage".to_string()) .spawn(move || { let _exit = Finalizer::new(exit); let mut entry_height_ = entry_height; let mut last_entry_id = last_entry_id; + let vote_signer = VoteSignerProxy::new(&keypair, vote_signer_addr); loop { let (leader_id, _) = bank .get_current_leader() @@ -250,7 +243,7 @@ impl ReplayStage { &window_receiver, &keypair, &vote_account_id, - &rpc_client, + &vote_signer, Some(&vote_blob_sender), &ledger_entry_sender, &mut entry_height_, @@ -299,17 +292,15 @@ mod test { make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, }; - use crate::create_vote_account::*; + use crate::local_vote_signer_service::LocalVoteSignerService; use crate::packet::BlobError; use crate::replay_stage::{ReplayStage, ReplayStageReturnType}; use crate::result::Error; - use crate::rpc_request::RpcClient; use crate::service::Service; - use crate::vote_stage::send_validator_vote; + use crate::vote_signer_proxy::VoteSignerProxy; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -344,7 +335,7 @@ mod test { // Write two entries to the ledger so that the validator is in the active set: // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (active_set_entries, vote_account_id) = make_active_set_entries(&my_keypair, signer, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; @@ -443,7 +434,7 @@ mod test { &entries_to_send[..leader_rotation_index + 1] ); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); assert_eq!(exit.load(Ordering::Relaxed), true); @@ -480,12 +471,13 @@ mod test { let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); // Set up the replay stage - let vote_account_id = Keypair::new().pubkey(); + let (signer_service, signer) = LocalVoteSignerService::new(); let bank = Arc::new(bank); let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let signer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); let my_keypair = Arc::new(my_keypair); + let vote_signer = VoteSignerProxy::new(&my_keypair, signer); + let vote_account_id = vote_signer.vote_account.clone(); let (replay_stage, ledger_writer_recv) = ReplayStage::new( my_keypair.clone(), &vote_account_id, @@ -501,14 +493,7 @@ mod test { // Vote sender should error because no leader contact info is found in the // ClusterInfo let (mock_sender, _mock_receiver) = channel(); - let _vote_err = send_validator_vote( - &bank, - &my_keypair, - &vote_account_id, - &RpcClient::new_from_socket(signer), - &cluster_info_me, - &mock_sender, - ); + let _vote_err = vote_signer.send_validator_vote(&bank, &cluster_info_me, &mock_sender); // Send ReplayStage an entry, should see it on the ledger writer receiver let next_tick = create_ticks( @@ -527,6 +512,7 @@ mod test { assert_eq!(next_tick, received_tick); drop(entry_sender); + signer_service.join().unwrap(); replay_stage .join() .expect("Expect successful ReplayStage exit"); @@ -560,7 +546,7 @@ mod test { // Write two entries to the ledger so that the validator is in the active set: // 1) Give the validator a nonzero number of tokens 2) A vote from the validator. // This will cause leader rotation after the bootstrap height - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (active_set_entries, vote_account_id) = make_active_set_entries(&my_keypair, signer, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; @@ -606,11 +592,12 @@ mod test { let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone()))); // Set up the replay stage + let my_keypair = Arc::new(my_keypair); + let signer_proxy = VoteSignerProxy::new(&my_keypair, signer); let vote_account_id = Arc::new(vote_account_id); let bank = Arc::new(bank); let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); - let my_keypair = Arc::new(my_keypair); let (replay_stage, ledger_writer_recv) = ReplayStage::new( my_keypair.clone(), &vote_account_id, @@ -626,14 +613,7 @@ mod test { // Vote sender should error because no leader contact info is found in the // ClusterInfo let (mock_sender, _mock_receiver) = channel(); - let _vote_err = send_validator_vote( - &bank, - &my_keypair, - &vote_account_id, - &RpcClient::new_from_socket(signer), - &cluster_info_me, - &mock_sender, - ); + let _vote_err = signer_proxy.send_validator_vote(&bank, &cluster_info_me, &mock_sender); // Send enough ticks to trigger leader rotation let total_entries_to_send = (bootstrap_height - initial_tick_height) as usize; @@ -673,7 +653,7 @@ mod test { )), replay_stage.join().expect("replay stage join") ); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); assert_eq!(exit.load(Ordering::Relaxed), true); let _ignored = remove_dir_all(&my_ledger_path); } @@ -713,16 +693,16 @@ mod test { .send(entries.clone()) .expect("Expected to err out"); - let signer = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let rpc_client = RpcClient::new_from_socket(signer); let my_keypair = Arc::new(my_keypair); + let (signer_service, signer) = LocalVoteSignerService::new(); + let vote_signer = VoteSignerProxy::new(&my_keypair, signer); let res = ReplayStage::process_entries( &Arc::new(Bank::default()), &cluster_info_me, &entry_receiver, &my_keypair, &vote_keypair.pubkey(), - &rpc_client, + &vote_signer, None, &ledger_entry_sender, &mut entry_height, @@ -749,7 +729,7 @@ mod test { &entry_receiver, &Arc::new(Keypair::new()), &Keypair::new().pubkey(), - &rpc_client, + &vote_signer, None, &ledger_entry_sender, &mut entry_height, @@ -765,7 +745,7 @@ mod test { e ), } - + signer_service.join().unwrap(); let _ignored = remove_dir_all(&my_ledger_path); } } diff --git a/src/result.rs b/src/result.rs index fba4b14bef..e93a41c80f 100644 --- a/src/result.rs +++ b/src/result.rs @@ -7,7 +7,7 @@ use crate::db_ledger; use crate::erasure; use crate::packet; use crate::poh_recorder; -use crate::vote_stage; +use crate::vote_signer_proxy; use bincode; use serde_json; use std; @@ -29,7 +29,7 @@ pub enum Error { ErasureError(erasure::ErasureError), SendError, PohRecorderError(poh_recorder::PohRecorderError), - VoteError(vote_stage::VoteError), + VoteError(vote_signer_proxy::VoteError), DbLedgerError(db_ledger::DbLedgerError), } @@ -104,8 +104,8 @@ impl std::convert::From for Error { Error::PohRecorderError(e) } } -impl std::convert::From for Error { - fn from(e: vote_stage::VoteError) -> Error { +impl std::convert::From for Error { + fn from(e: vote_signer_proxy::VoteError) -> Error { Error::VoteError(e) } } diff --git a/src/vote_signer_proxy.rs b/src/vote_signer_proxy.rs new file mode 100644 index 0000000000..78909d1b44 --- /dev/null +++ b/src/vote_signer_proxy.rs @@ -0,0 +1,143 @@ +//! The `vote_signer_proxy` votes on the `last_id` of the bank at a regular cadence + +use crate::bank::Bank; +use crate::cluster_info::ClusterInfo; +use crate::counter::Counter; +use crate::packet::SharedBlob; +use crate::result::{Error, Result}; +use crate::rpc_request::{RpcClient, RpcRequest}; +use crate::streamer::BlobSender; +use bincode::serialize; +use log::Level; +use solana_sdk::hash::Hash; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::transaction::Transaction; +use solana_sdk::vote_program::Vote; +use solana_sdk::vote_transaction::VoteTransaction; +use std::net::SocketAddr; +use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, RwLock}; + +#[derive(Debug, PartialEq, Eq)] +pub enum VoteError { + NoValidSupermajority, + NoLeader, + LeaderInfoNotFound, +} + +pub struct VoteSignerProxy { + rpc_client: RpcClient, + keypair: Arc, + pub vote_account: Pubkey, +} + +impl VoteSignerProxy { + pub fn new(keypair: &Arc, signer: SocketAddr) -> Self { + let rpc_client = RpcClient::new_from_socket(signer); + + let msg = "Registering a new node"; + let sig = Signature::new(&keypair.sign(msg.as_bytes()).as_ref()); + let params = json!([keypair.pubkey(), sig, msg.as_bytes()]); + let resp = RpcRequest::RegisterNode + .retry_make_rpc_request(&rpc_client, 1, Some(params), 5) + .unwrap(); + let vote_account: Pubkey = serde_json::from_value(resp).unwrap(); + + Self { + rpc_client, + keypair: keypair.clone(), + vote_account, + } + } + + pub fn new_vote_account(&self, bank: &Bank, num_tokens: u64, last_id: Hash) -> Result<()> { + // Create and register the new vote account + let tx = + Transaction::vote_account_new(&self.keypair, self.vote_account, last_id, num_tokens, 0); + bank.process_transaction(&tx)?; + Ok(()) + } + + pub fn send_validator_vote( + &self, + bank: &Arc, + cluster_info: &Arc>, + vote_blob_sender: &BlobSender, + ) -> Result<()> { + let last_id = bank.last_id(); + + if let Ok(shared_blob) = self.new_signed_vote_blob(&last_id, bank, cluster_info) { + inc_new_counter_info!("validator-vote_sent", 1); + vote_blob_sender.send(vec![shared_blob])?; + } + + Ok(()) + } + + pub fn new_signed_vote_transaction(&self, last_id: &Hash, tick_height: u64) -> Transaction { + let vote = Vote { tick_height }; + let tx = Transaction::vote_new(&self.vote_account, vote, *last_id, 0); + + let msg = tx.get_sign_data(); + let sig = Signature::new(&self.keypair.sign(&msg).as_ref()); + + let keypair = self.keypair.clone(); + let params = json!([keypair.pubkey(), sig, &msg]); + let resp = RpcRequest::SignVote + .make_rpc_request(&self.rpc_client, 1, Some(params)) + .unwrap(); + let vote_signature: Signature = serde_json::from_value(resp).unwrap(); + + Transaction { + signatures: vec![vote_signature], + account_keys: tx.account_keys, + last_id: tx.last_id, + fee: tx.fee, + program_ids: tx.program_ids, + instructions: tx.instructions, + } + } + + // TODO: Change voting to be on fixed tick intervals based on bank state + fn new_signed_vote_blob( + &self, + last_id: &Hash, + bank: &Arc, + cluster_info: &Arc>, + ) -> Result { + let shared_blob = SharedBlob::default(); + let tick_height = bank.tick_height(); + + let leader_tpu = VoteSignerProxy::get_leader_tpu(&bank, cluster_info)?; + //TODO: doesn't seem like there is a synchronous call to get height and id + debug!("voting on {:?}", &last_id.as_ref()[..8]); + let tx = self.new_signed_vote_transaction(last_id, tick_height); + + { + let mut blob = shared_blob.write().unwrap(); + let bytes = serialize(&tx)?; + let len = bytes.len(); + blob.data[..len].copy_from_slice(&bytes); + blob.meta.set_addr(&leader_tpu); + blob.meta.size = len; + }; + + Ok(shared_blob) + } + + fn get_leader_tpu(bank: &Bank, cluster_info: &Arc>) -> Result { + let leader_id = match bank.get_current_leader() { + Some((leader_id, _)) => leader_id, + None => return Err(Error::VoteError(VoteError::NoLeader)), + }; + + let rcluster_info = cluster_info.read().unwrap(); + let leader_tpu = rcluster_info.lookup(leader_id).map(|leader| leader.tpu); + if let Some(leader_tpu) = leader_tpu { + Ok(leader_tpu) + } else { + Err(Error::VoteError(VoteError::LeaderInfoNotFound)) + } + } +} diff --git a/src/vote_stage.rs b/src/vote_stage.rs deleted file mode 100644 index 8a8cea7399..0000000000 --- a/src/vote_stage.rs +++ /dev/null @@ -1,126 +0,0 @@ -//! The `vote_stage` votes on the `last_id` of the bank at a regular cadence - -use crate::bank::Bank; -use crate::cluster_info::ClusterInfo; -use crate::counter::Counter; -use crate::packet::SharedBlob; -use crate::result::{Error, Result}; -use crate::rpc_request::{RpcClient, RpcRequest}; -use crate::streamer::BlobSender; -use bincode::serialize; -use log::Level; -use solana_sdk::hash::Hash; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; -use solana_sdk::transaction::Transaction; -use solana_sdk::vote_program::Vote; -use solana_sdk::vote_transaction::VoteTransaction; -use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; -use std::sync::{Arc, RwLock}; - -#[derive(Debug, PartialEq, Eq)] -pub enum VoteError { - NoValidSupermajority, - NoLeader, - LeaderInfoNotFound, -} - -pub fn create_new_signed_vote_transaction( - last_id: &Hash, - keypair: &Arc, - tick_height: u64, - vote_account: &Pubkey, - rpc_client: &RpcClient, -) -> Transaction { - let vote = Vote { tick_height }; - let tx = Transaction::vote_new(&vote_account, vote, *last_id, 0); - - let msg = tx.get_sign_data(); - let sig = Signature::new(&keypair.sign(&msg).as_ref()); - - let params = json!([keypair.pubkey(), sig, &msg]); - let resp = RpcRequest::SignVote - .make_rpc_request(&rpc_client, 1, Some(params)) - .unwrap(); - let vote_signature: Signature = serde_json::from_value(resp).unwrap(); - - Transaction { - signatures: vec![vote_signature], - account_keys: tx.account_keys, - last_id: tx.last_id, - fee: tx.fee, - program_ids: tx.program_ids, - instructions: tx.instructions, - } -} - -// TODO: Change voting to be on fixed tick intervals based on bank state -pub fn create_new_signed_vote_blob( - last_id: &Hash, - keypair: &Arc, - vote_account: &Pubkey, - rpc_client: &RpcClient, - bank: &Arc, - cluster_info: &Arc>, -) -> Result { - let shared_blob = SharedBlob::default(); - let tick_height = bank.tick_height(); - - let leader_tpu = get_leader_tpu(&bank, cluster_info)?; - //TODO: doesn't seem like there is a synchronous call to get height and id - debug!("voting on {:?}", &last_id.as_ref()[..8]); - let tx = - create_new_signed_vote_transaction(last_id, keypair, tick_height, vote_account, rpc_client); - - { - let mut blob = shared_blob.write().unwrap(); - let bytes = serialize(&tx)?; - let len = bytes.len(); - blob.data[..len].copy_from_slice(&bytes); - blob.meta.set_addr(&leader_tpu); - blob.meta.size = len; - }; - - Ok(shared_blob) -} - -fn get_leader_tpu(bank: &Bank, cluster_info: &Arc>) -> Result { - let leader_id = match bank.get_current_leader() { - Some((leader_id, _)) => leader_id, - None => return Err(Error::VoteError(VoteError::NoLeader)), - }; - - let rcluster_info = cluster_info.read().unwrap(); - let leader_tpu = rcluster_info.lookup(leader_id).map(|leader| leader.tpu); - if let Some(leader_tpu) = leader_tpu { - Ok(leader_tpu) - } else { - Err(Error::VoteError(VoteError::LeaderInfoNotFound)) - } -} - -pub fn send_validator_vote( - bank: &Arc, - keypair: &Arc, - vote_account: &Pubkey, - vote_signer_rpc: &RpcClient, - cluster_info: &Arc>, - vote_blob_sender: &BlobSender, -) -> Result<()> { - let last_id = bank.last_id(); - - if let Ok(shared_blob) = create_new_signed_vote_blob( - &last_id, - keypair, - vote_account, - vote_signer_rpc, - bank, - cluster_info, - ) { - inc_new_counter_info!("validator-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob])?; - } - - Ok(()) -} diff --git a/tests/multinode.rs b/tests/multinode.rs index cc874b8921..8f50ed93e7 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -2,7 +2,6 @@ extern crate log; extern crate bincode; extern crate chrono; -#[macro_use] extern crate serde_json; extern crate solana; extern crate solana_sdk; @@ -11,24 +10,23 @@ extern crate solana_vote_signer; use solana::blob_fetch_stage::BlobFetchStage; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::contact_info::ContactInfo; -use solana::create_vote_account::*; use solana::db_ledger::{create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger}; use solana::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::gossip_service::GossipService; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; - +use solana::local_vote_signer_service::LocalVoteSignerService; use solana::mint::Mint; use solana::packet::SharedBlob; use solana::poh_service::NUM_TICKS_PER_SECOND; use solana::result; -use solana::rpc_request::{RpcClient, RpcRequest}; use solana::service::Service; use solana::thin_client::{retry_get_balance, ThinClient}; +use solana::vote_signer_proxy::VoteSignerProxy; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::timing::{duration_as_ms, duration_as_s}; use solana_sdk::transaction::Transaction; @@ -127,20 +125,6 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { .collect() } -fn register_node(signer: SocketAddr, keypair: Arc) -> Pubkey { - let rpc_client = RpcClient::new_from_socket(signer); - - let msg = "Registering a new node"; - let sig = Signature::new(&keypair.sign(msg.as_bytes()).as_ref()); - - let params = json!([keypair.pubkey(), sig, msg.as_bytes()]); - let resp = RpcRequest::RegisterNode - .retry_make_rpc_request(&rpc_client, 1, Some(params), 5) - .unwrap(); - let vote_account_id: Pubkey = serde_json::from_value(resp).unwrap(); - vote_account_id -} - #[test] fn test_multi_node_ledger_window() -> result::Result<()> { solana_logger::setup(); @@ -174,8 +158,9 @@ fn test_multi_node_ledger_window() -> result::Result<()> { .unwrap(); } - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let vote_id = register_node(signer, leader_keypair.clone()); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let leader = Fullnode::new( leader, &leader_ledger_path, @@ -194,7 +179,8 @@ fn test_multi_node_ledger_window() -> result::Result<()> { let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); - let validator_vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let validator_vote_id = signer_proxy.vote_account.clone(); let validator = Fullnode::new( validator, &zero_ledger_path, @@ -239,7 +225,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { validator.close()?; leader.close()?; - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for path in ledger_paths { remove_dir_all(path).unwrap(); @@ -275,8 +261,9 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { ); ledger_paths.push(zero_ledger_path.clone()); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let vote_id = register_node(signer, leader_keypair.clone()); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let server = Fullnode::new( leader, &leader_ledger_path, @@ -306,7 +293,8 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { .unwrap(); info!("validator balance {}", validator_balance); - let vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let val = Fullnode::new( validator, &ledger_path, @@ -346,7 +334,8 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { // balances let keypair = Arc::new(Keypair::new()); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); - let vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let val = Fullnode::new( validator, &zero_ledger_path, @@ -395,7 +384,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { node.close()?; } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for path in ledger_paths { remove_dir_all(path).unwrap(); @@ -420,8 +409,9 @@ fn test_multi_node_basic() { let (alice, leader_ledger_path) = create_tmp_genesis("multi_node_basic", 10_000, leader_data.id, 500); ledger_paths.push(leader_ledger_path.clone()); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let vote_id = register_node(signer, leader_keypair.clone()); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let server = Fullnode::new( leader, &leader_ledger_path, @@ -447,7 +437,8 @@ fn test_multi_node_basic() { send_tx_and_retry_get_balance(&leader_data, &alice, &validator_pubkey, 500, None) .unwrap(); info!("validator balance {}", validator_balance); - let vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let val = Fullnode::new( validator, &ledger_path, @@ -484,7 +475,7 @@ fn test_multi_node_basic() { for node in nodes { node.close().unwrap(); } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for path in ledger_paths { remove_dir_all(path).unwrap(); @@ -505,8 +496,9 @@ fn test_boot_validator_from_file() -> result::Result<()> { ledger_paths.push(leader_ledger_path.clone()); let leader_data = leader.info.clone(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let vote_id = register_node(signer, leader_keypair.clone()); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let leader_fullnode = Fullnode::new( leader, &leader_ledger_path, @@ -530,7 +522,8 @@ fn test_boot_validator_from_file() -> result::Result<()> { let validator_data = validator.info.clone(); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); ledger_paths.push(ledger_path.clone()); - let vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let val_fullnode = Fullnode::new( validator, &ledger_path, @@ -548,7 +541,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { val_fullnode.close()?; leader_fullnode.close()?; - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for path in ledger_paths { remove_dir_all(path)?; @@ -597,8 +590,9 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { ); let bob_pubkey = Keypair::new().pubkey(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let vote_id = register_node(signer, leader_keypair.clone()); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let (leader_data, leader_fullnode) = create_leader(&ledger_path, leader_keypair.clone(), &vote_id, &signer); @@ -633,7 +627,8 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); - let vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let val_fullnode = Fullnode::new( validator, &stale_ledger_path, @@ -669,7 +664,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { val_fullnode.close()?; leader_fullnode.close()?; - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); remove_dir_all(ledger_path)?; remove_dir_all(stale_ledger_path)?; @@ -703,8 +698,9 @@ fn test_multi_node_dynamic_network() { let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.info.clone(); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let vote_id = register_node(signer, leader_keypair.clone()); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let server = Fullnode::new( leader, &leader_ledger_path, @@ -775,7 +771,8 @@ fn test_multi_node_dynamic_network() { let rd = validator.info.clone(); info!("starting {} {}", keypair.pubkey(), rd.id); let keypair = Arc::new(keypair); - let vote_id = register_node(signer, keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let val = Fullnode::new( validator, &ledger_path, @@ -866,7 +863,7 @@ fn test_multi_node_dynamic_network() { } } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); assert_eq!(consecutive_success, 10); for (_, node) in &validators { node.exit(); @@ -913,7 +910,7 @@ fn test_leader_to_validator_transition() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (bootstrap_entries, _) = make_active_set_entries( &validator_keypair, signer, @@ -942,7 +939,8 @@ fn test_leader_to_validator_transition() { Some(bootstrap_height), ); - let vote_id = register_node(signer, leader_keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let mut leader = Fullnode::new( leader_node, &leader_ledger_path, @@ -1024,7 +1022,7 @@ fn test_leader_to_validator_transition() { ); assert_eq!(bank.tick_height(), bootstrap_height); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); remove_dir_all(leader_ledger_path).unwrap(); } @@ -1069,7 +1067,7 @@ fn test_leader_validator_basic() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (active_set_entries, _vote_account_keypair) = make_active_set_entries( &validator_keypair, signer, @@ -1100,7 +1098,8 @@ fn test_leader_validator_basic() { ); // Start the validator node - let vote_id = register_node(signer, validator_keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&validator_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let mut validator = Fullnode::new( validator_node, &validator_ledger_path, @@ -1114,7 +1113,8 @@ fn test_leader_validator_basic() { ); // Start the leader fullnode - let vote_id = register_node(signer, leader_keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let mut leader = Fullnode::new( leader_node, &leader_ledger_path, @@ -1189,7 +1189,7 @@ fn test_leader_validator_basic() { for (v, l) in validator_entries.iter().zip(leader_entries) { assert_eq!(*v, l); } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for path in ledger_paths { DbLedger::destroy(&path).expect("Expected successful database destruction"); @@ -1262,7 +1262,7 @@ fn test_dropped_handoff_recovery() { // Make the entries to give the next_leader validator some stake so that they will be in // leader election active set - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let (active_set_entries, _vote_account_keypair) = make_active_set_entries( &next_leader_keypair, signer, @@ -1306,7 +1306,8 @@ fn test_dropped_handoff_recovery() { Some(leader_rotation_interval), ); - let vote_id = register_node(signer, bootstrap_leader_keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&bootstrap_leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); // Start up the bootstrap leader fullnode let bootstrap_leader = Fullnode::new( bootstrap_leader_node, @@ -1332,7 +1333,8 @@ fn test_dropped_handoff_recovery() { ledger_paths.push(validator_ledger_path.clone()); let validator_id = kp.pubkey(); let validator_node = Node::new_localhost_with_pubkey(validator_id); - let vote_id = register_node(signer, kp.clone()); + let signer_proxy = VoteSignerProxy::new(&kp, signer); + let vote_id = signer_proxy.vote_account.clone(); let validator = Fullnode::new( validator_node, &validator_ledger_path, @@ -1360,7 +1362,8 @@ fn test_dropped_handoff_recovery() { // Now start up the "next leader" node let next_leader_node = Node::new_localhost_with_pubkey(next_leader_keypair.pubkey()); - let vote_id = register_node(signer, next_leader_keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&next_leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let mut next_leader = Fullnode::new( next_leader_node, &next_leader_ledger_path, @@ -1381,7 +1384,7 @@ fn test_dropped_handoff_recovery() { nodes.push(next_leader); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for node in nodes { node.close().unwrap(); } @@ -1442,7 +1445,7 @@ fn test_full_leader_validator_network() { let mut ledger_paths = Vec::new(); ledger_paths.push(bootstrap_leader_ledger_path.clone()); - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); let mut vote_account_keypairs = VecDeque::new(); for node_keypair in node_keypairs.iter() { // Make entries to give each node some stake so that they will be in the @@ -1512,7 +1515,8 @@ fn test_full_leader_validator_network() { let validator_id = kp.pubkey(); let validator_node = Node::new_localhost_with_pubkey(validator_id); let kp = Arc::new(kp); - let vote_id = register_node(signer, kp.clone()); + let signer_proxy = VoteSignerProxy::new(&kp, signer); + let vote_id = signer_proxy.vote_account.clone(); let validator = Arc::new(RwLock::new(Fullnode::new( validator_node, &validator_ledger_path, @@ -1531,7 +1535,8 @@ fn test_full_leader_validator_network() { // Start up the bootstrap leader let leader_keypair = Arc::new(leader_keypair); - let vote_id = register_node(signer, leader_keypair.clone()); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, @@ -1651,7 +1656,7 @@ fn test_full_leader_validator_network() { assert!(shortest.unwrap() >= target_height); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); for path in ledger_paths { DbLedger::destroy(&path).expect("Expected successful database destruction"); remove_dir_all(path).unwrap(); @@ -1714,9 +1719,10 @@ fn test_broadcast_last_tick() { ); // Start up the bootstrap leader fullnode - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); let bootstrap_leader_keypair = Arc::new(bootstrap_leader_keypair); - let vote_id = bootstrap_leader_keypair.pubkey(); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&bootstrap_leader_keypair, signer); + let vote_id = signer_proxy.vote_account.clone(); let mut bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, @@ -1772,7 +1778,7 @@ fn test_broadcast_last_tick() { bf.join().unwrap(); } - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); // Shut down the listeners for node in listening_nodes { node.0.close().unwrap(); diff --git a/tests/replicator.rs b/tests/replicator.rs index 7901508ee1..e2e5f4e6da 100644 --- a/tests/replicator.rs +++ b/tests/replicator.rs @@ -1,23 +1,24 @@ #[macro_use] extern crate log; +#[cfg(feature = "chacha")] #[macro_use] extern crate serde_json; use bincode::deserialize; use solana::client::mk_client; use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; -use solana::create_vote_account::*; use solana::db_ledger::DbLedger; use solana::db_ledger::{create_tmp_genesis, get_tmp_ledger_path, tmp_copy_ledger}; use solana::entry::Entry; use solana::fullnode::Fullnode; use solana::leader_scheduler::LeaderScheduler; +use solana::local_vote_signer_service::LocalVoteSignerService; use solana::replicator::Replicator; -use solana::rpc_request::{RpcClient, RpcRequest}; +use solana::service::Service; use solana::streamer::blob_receiver; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; +use solana::vote_signer_proxy::VoteSignerProxy; +use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::system_transaction::SystemTransaction; use solana_sdk::transaction::Transaction; use std::fs::remove_dir_all; @@ -44,17 +45,9 @@ fn test_replicator_startup() { tmp_copy_ledger(&leader_ledger_path, "replicator_test_validator_ledger"); { - let (signer, t_signer, signer_exit) = local_vote_signer_service().unwrap(); - let rpc_client = RpcClient::new_from_socket(signer); - - let msg = "Registering a new node"; - let sig = Signature::new(&leader_keypair.sign(msg.as_bytes()).as_ref()); - - let params = json!([leader_keypair.pubkey(), sig, msg.as_bytes()]); - let resp = RpcRequest::RegisterNode - .make_rpc_request(&rpc_client, 1, Some(params)) - .unwrap(); - let vote_account_id: Pubkey = serde_json::from_value(resp).unwrap(); + let (signer_service, signer) = LocalVoteSignerService::new(); + let signer_proxy = VoteSignerProxy::new(&leader_keypair, signer); + let vote_account_id = signer_proxy.vote_account.clone(); let leader = Fullnode::new( leader_node, @@ -69,17 +62,9 @@ fn test_replicator_startup() { ); let validator_keypair = Arc::new(Keypair::new()); + let signer_proxy = VoteSignerProxy::new(&validator_keypair, signer); + let vote_account_id = signer_proxy.vote_account.clone(); let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); - - let msg = "Registering a new node"; - let sig = Signature::new(&validator_keypair.sign(msg.as_bytes()).as_ref()); - - let params = json!([validator_keypair.pubkey(), sig, msg.as_bytes()]); - let resp = RpcRequest::RegisterNode - .make_rpc_request(&rpc_client, 1, Some(params)) - .unwrap(); - let vote_account_id: Pubkey = serde_json::from_value(resp).unwrap(); - #[cfg(feature = "chacha")] let validator_node_info = validator_node.info.clone(); @@ -197,7 +182,7 @@ fn test_replicator_startup() { // Check that some ledger was downloaded assert!(num_txs != 0); - stop_local_vote_signer_service(t_signer, &signer_exit); + signer_service.join().unwrap(); replicator.close(); validator.exit();