remove set_leader from cluster_info (#4998)

This commit is contained in:
Rob Walker 2019-07-09 22:06:47 -07:00 committed by GitHub
parent aebd70ddce
commit f537482c86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 53 additions and 89 deletions

View File

@ -107,12 +107,13 @@ impl BankForks {
}
}
pub fn insert(&mut self, bank: Bank) {
pub fn insert(&mut self, bank: Bank) -> Arc<Bank> {
let bank = Arc::new(bank);
let prev = self.banks.insert(bank.slot(), bank.clone());
assert!(prev.is_none());
self.working_bank = bank.clone();
bank
}
// TODO: really want to kill this...

View File

@ -78,9 +78,6 @@ pub struct ClusterInfo {
pub gossip: CrdsGossip,
/// set the keypair that will be used to sign crds values generated. It is unset only in tests.
pub(crate) keypair: Arc<Keypair>,
// TODO: remove gossip_leader_pubkey once all usage of `set_leader()` and `leader_data()` is
// purged
gossip_leader_pubkey: Pubkey,
/// The network entrypoint
entrypoint: Option<ContactInfo>,
}
@ -181,7 +178,6 @@ impl ClusterInfo {
let mut me = Self {
gossip: CrdsGossip::default(),
keypair,
gossip_leader_pubkey: Pubkey::default(),
entrypoint: None,
};
let id = contact_info.id;
@ -237,15 +233,6 @@ impl ClusterInfo {
self.lookup(&self.id()).cloned().unwrap()
}
// Deprecated: don't use leader_data().
pub fn leader_data(&self) -> Option<&ContactInfo> {
let leader_pubkey = self.gossip_leader_pubkey;
if leader_pubkey == Pubkey::default() {
return None;
}
self.lookup(&leader_pubkey)
}
pub fn contact_info_trace(&self) -> String {
let now = timestamp();
let mut spy_nodes = 0;
@ -302,17 +289,6 @@ impl ClusterInfo {
)
}
/// Record the id of the current leader for use by `leader_tpu_via_blobs()`
pub fn set_leader(&mut self, leader_pubkey: &Pubkey) {
if *leader_pubkey != self.gossip_leader_pubkey {
warn!(
"{}: LEADER_UPDATE TO {} from {}",
self.gossip.id, leader_pubkey, self.gossip_leader_pubkey,
);
self.gossip_leader_pubkey = *leader_pubkey;
}
}
pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet<u64>) {
let now = timestamp();
let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now));
@ -1949,17 +1925,6 @@ mod tests {
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_default_leader() {
solana_logger::setup();
let contact_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let network_entry_point =
ContactInfo::new_gossip_entry_point(&socketaddr!("127.0.0.1:1239"));
cluster_info.insert_info(network_entry_point);
assert!(cluster_info.leader_data().is_none());
}
fn assert_in_range(x: u16, range: (u16, u16)) {
assert!(x >= range.0);
assert!(x < range.1);
@ -2043,12 +2008,9 @@ mod tests {
//create new cluster info, leader, and peer
let keypair = Keypair::new();
let peer_keypair = Keypair::new();
let leader_keypair = Keypair::new();
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let leader = ContactInfo::new_localhost(&leader_keypair.pubkey(), 0);
let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0);
let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair));
cluster_info.set_leader(&leader.id);
cluster_info.insert_info(peer.clone());
cluster_info.gossip.refresh_push_active_set(&HashMap::new());
//check that all types of gossip messages are signed correctly

View File

@ -147,7 +147,7 @@ impl ContactInfo {
}
#[cfg(test)]
fn new_with_pubkey_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self {
pub(crate) fn new_with_pubkey_socketaddr(pubkey: &Pubkey, bind_addr: &SocketAddr) -> Self {
fn next_port(addr: &SocketAddr, nxt: u16) -> SocketAddr {
let mut nxt_addr = *addr;
nxt_addr.set_port(addr.port() + nxt);

View File

@ -163,7 +163,6 @@ impl ReplayStage {
&my_pubkey,
&bank_forks,
&poh_recorder,
&cluster_info,
&leader_schedule_cache,
);
@ -189,10 +188,9 @@ impl ReplayStage {
my_pubkey: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) = {
let (grace_ticks, poh_slot, parent_slot) = {
let poh_recorder = poh_recorder.lock().unwrap();
// we're done
@ -201,19 +199,26 @@ impl ReplayStage {
return;
}
poh_recorder.reached_leader_tick()
let (reached_leader_tick, grace_ticks, poh_slot, parent_slot) =
poh_recorder.reached_leader_tick();
if !reached_leader_tick {
trace!("{} poh_recorder hasn't reached_leader_tick", my_pubkey);
return;
}
(grace_ticks, poh_slot, parent_slot)
};
trace!(
"{} reached_leader_tick: {} poh_slot: {} parent_slot: {}",
"{} reached_leader_tick, poh_slot: {} parent_slot: {}",
my_pubkey,
reached_leader_tick,
poh_slot,
parent_slot,
);
if bank_forks.read().unwrap().get(poh_slot).is_some() {
trace!("{} already have bank in forks at {}", my_pubkey, poh_slot);
warn!("{} already have bank in forks at {}", my_pubkey, poh_slot);
return;
}
@ -242,33 +247,30 @@ impl ReplayStage {
next_leader,
poh_slot
);
// TODO: remove me?
cluster_info.write().unwrap().set_leader(&next_leader);
if next_leader == *my_pubkey && reached_leader_tick {
trace!("{} starting tpu for slot {}", my_pubkey, poh_slot);
datapoint_warn!(
"replay_stage-new_leader",
("count", poh_slot, i64),
("grace", grace_ticks, i64)
);
let tpu_bank = Bank::new_from_parent(&parent, my_pubkey, poh_slot);
bank_forks.write().unwrap().insert(tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_pubkey,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
// I guess I missed my slot
if next_leader != *my_pubkey {
return;
}
datapoint_warn!(
"replay_stage-new_leader",
("count", poh_slot, i64),
("grace", grace_ticks, i64)
);
let tpu_bank = bank_forks
.write()
.unwrap()
.insert(Bank::new_from_parent(&parent, my_pubkey, poh_slot));
info!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_pubkey,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
} else {
error!("{} No next leader found", my_pubkey);
}

View File

@ -109,6 +109,10 @@ impl JsonRpcRequestProcessor {
Ok(self.bank().slot())
}
fn get_slot_leader(&self) -> Result<String> {
Ok(self.bank().collector_id().to_string())
}
fn get_transaction_count(&self) -> Result<u64> {
Ok(self.bank().transaction_count() as u64)
}
@ -512,12 +516,7 @@ impl RpcSol for RpcSolImpl {
}
fn get_slot_leader(&self, meta: Self::Metadata) -> Result<String> {
let cluster_info = meta.cluster_info.read().unwrap();
let leader_data_option = cluster_info.leader_data();
Ok(leader_data_option
.and_then(|leader_data| Some(leader_data.id))
.unwrap_or_default()
.to_string())
meta.request_processor.read().unwrap().get_slot_leader()
}
fn get_epoch_vote_accounts(&self, meta: Self::Metadata) -> Result<Vec<RpcVoteAccountInfo>> {
@ -577,6 +576,7 @@ mod tests {
) -> (MetaIoHandler<Meta>, Meta, Arc<Bank>, Hash, Keypair, Pubkey) {
let (bank_forks, alice) = new_bank_forks();
let bank = bank_forks.read().unwrap().working_bank();
let leader_pubkey = *bank.collector_id();
let exit = Arc::new(AtomicBool::new(false));
let blockhash = bank.confirmed_last_blockhash().0;
@ -595,9 +595,14 @@ mod tests {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::default(),
)));
let leader = ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
cluster_info.write().unwrap().insert_info(leader.clone());
cluster_info
.write()
.unwrap()
.insert_info(ContactInfo::new_with_pubkey_socketaddr(
&leader_pubkey,
&socketaddr!("127.0.0.1:1234"),
));
let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
@ -606,7 +611,7 @@ mod tests {
request_processor,
cluster_info,
};
(io, meta, bank, blockhash, alice, leader.id)
(io, meta, bank, blockhash, alice, leader_pubkey)
}
#[test]
@ -674,13 +679,12 @@ mod tests {
#[test]
fn test_rpc_get_slot_leader() {
let bob_pubkey = Pubkey::new_rand();
let (io, meta, _bank, _blockhash, _alice, _leader_pubkey) =
let (io, meta, _bank, _blockhash, _alice, leader_pubkey) =
start_rpc_handler_with_tx(&bob_pubkey);
let req = format!(r#"{{"jsonrpc":"2.0","id":1,"method":"getSlotLeader"}}"#);
let res = io.handle_request_sync(&req, meta);
let expected =
format!(r#"{{"jsonrpc":"2.0","result":"11111111111111111111111111111111","id":1}}"#);
let expected = format!(r#"{{"jsonrpc":"2.0","result":"{}","id":1}}"#, leader_pubkey);
let expected: Response =
serde_json::from_str(&expected).expect("expected response deserialization");
let result: Response = serde_json::from_str(&res.expect("actual response"))

View File

@ -11,7 +11,6 @@ use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
use crossbeam_channel::unbounded;
use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Receiver};
@ -29,7 +28,6 @@ pub struct Tpu {
impl Tpu {
#[allow(clippy::too_many_arguments)]
pub fn new(
id: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntries>,
@ -41,8 +39,6 @@ impl Tpu {
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
) -> Self {
cluster_info.write().unwrap().set_leader(id);
let (packet_sender, packet_receiver) = channel();
let fetch_stage = FetchStage::new_with_sender(
transactions_sockets,

View File

@ -256,7 +256,6 @@ impl Validator {
}
let tpu = Tpu::new(
&id,
&cluster_info,
&poh_recorder,
entry_receiver,