From 97b1156a7ad51ec6d7c462f3e5c6f56a915e10fb Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 6 Dec 2018 13:52:47 -0700 Subject: [PATCH] Rename Ncp to GossipService And BroadcastStage to BroadcastService since it's not included in the TPU pipeline. --- book/art/fullnode.bob | 8 +-- book/art/tvu.bob | 5 +- book/src/SUMMARY.md | 2 +- book/src/{ncp.md => gossip.md} | 10 +-- src/bin/bench-tps.rs | 13 ++-- src/bin/fullnode.rs | 8 +-- src/bin/replicator.rs | 10 +-- ...roadcast_stage.rs => broadcast_service.rs} | 36 +++++----- src/cluster_info.rs | 69 ++++++++++--------- src/contact_info.rs | 22 +++--- src/crds_gossip_pull.rs | 2 +- src/crds_gossip_push.rs | 2 +- src/fullnode.rs | 40 +++++------ src/{ncp.rs => gossip_service.rs} | 18 ++--- src/lib.rs | 4 +- src/replicator.rs | 10 +-- src/thin_client.rs | 6 +- src/tpu_forwarder.rs | 2 +- src/tvu.rs | 8 +-- src/wallet.rs | 12 ++-- src/window_service.rs | 6 +- tests/data_replicator.rs | 16 ++--- tests/multinode.rs | 56 +++++++-------- 23 files changed, 186 insertions(+), 179 deletions(-) rename book/src/{ncp.md => gossip.md} (90%) rename src/{broadcast_stage.rs => broadcast_service.rs} (90%) rename src/{ncp.rs => gossip_service.rs} (83%) diff --git a/book/art/fullnode.bob b/book/art/fullnode.bob index aaab4d1dd0..6afe5ea519 100644 --- a/book/art/fullnode.bob +++ b/book/art/fullnode.bob @@ -7,9 +7,9 @@ | |<----| | | `----+---` | `------------------` | | | ^ | .------------------. - | | | .-----. | | Validators | - | | | | NCP |<---------->| | - | | | `---+-` | | .------------. | + | | | .----------------. | | Validators | + | | | | Gossip Service +----->| | + | | | `--------+-------` | | .------------. | | | | ^ | | | | | | | | | | v | | | Upstream | | | | .--+---. .-+---. | | | Validators | | @@ -19,7 +19,7 @@ | | | | | .------------. | | | .--+--. .-----------. | | | | | `-------->| TPU +-->| Broadcast +--------->| Downstream | | - | `-----` | Stage | | | | Validators | | + | `-----` | Service | | | | Validators | | | `-----------` | | | | | | | | `------------` | `---------------------------` | | diff --git a/book/art/tvu.bob b/book/art/tvu.bob index 298fcd21fe..4f92319250 100644 --- a/book/art/tvu.bob +++ b/book/art/tvu.bob @@ -17,5 +17,6 @@ | | | | V v .+-----------. .------. - | NCP | | Bank | - `------------` `------` + | Gossip | | Bank | + | Service | `------` + `------------` diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index 00f16d31c6..0bbbf4ae55 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -16,7 +16,7 @@ - [Anatomy of a Fullnode](fullnode.md) - [TPU](tpu.md) - [TVU](tvu.md) - - [NCP](ncp.md) + - [Gossip Service](gossip.md) - [The Runtime](runtime.md) ## Appendix diff --git a/book/src/ncp.md b/book/src/gossip.md similarity index 90% rename from book/src/ncp.md rename to book/src/gossip.md index 474e4dd3cb..d6668ea381 100644 --- a/book/src/ncp.md +++ b/book/src/gossip.md @@ -1,8 +1,8 @@ -# The Network Control Plane +# The Gossip Service -The Network Control Plane (NCP) acts as a gateway to nodes in the control -plane. Fullnodes use the NCP to ensure information is available to all other -nodes in a cluster. The NCP broadcasts information using a gossip protocol. +The Gossip Service acts as a gateway to nodes in the control plane. Fullnodes +use the service to ensure information is available to all other nodes in a cluster. +The service broadcasts information using a gossip protocol. ## Gossip Overview @@ -25,7 +25,7 @@ timestamp) as needed to make sense to the node receiving them. If a node recieves two records from the same source, it it updates its own copy with the record with the most recent timestamp. -## NCP Interface +## Gossip Service Interface ### Push Message diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 17871d6caa..8d21f56680 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -12,8 +12,8 @@ use clap::{App, Arg}; use rayon::prelude::*; use solana::client::mk_client; use solana::cluster_info::{ClusterInfo, NodeInfo}; +use solana::gossip_service::GossipService; use solana::logger; -use solana::ncp::Ncp; use solana::service::Service; use solana::signature::GenKeys; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; @@ -642,7 +642,7 @@ fn main() { let leader = poll_gossip_for_leader(network, None).expect("unable to find leader on network"); let exit_signal = Arc::new(AtomicBool::new(false)); - let (nodes, leader, ncp) = converge(&leader, &exit_signal, num_nodes); + let (nodes, leader, gossip_service) = converge(&leader, &exit_signal, num_nodes); if nodes.len() < num_nodes { println!( @@ -825,14 +825,14 @@ fn main() { ); // join the cluster_info client threads - ncp.join().unwrap(); + gossip_service.join().unwrap(); } fn converge( leader: &NodeInfo, exit_signal: &Arc, num_nodes: usize, -) -> (Vec, Option, Ncp) { +) -> (Vec, Option, GossipService) { //lets spy on the network let (node, gossip_socket) = ClusterInfo::spy_node(); let mut spy_cluster_info = ClusterInfo::new(node); @@ -840,7 +840,8 @@ fn converge( spy_cluster_info.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_cluster_info)); let window = Arc::new(RwLock::new(default_window())); - let ncp = Ncp::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); + let gossip_service = + GossipService::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); let mut v: Vec = vec![]; // wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { @@ -866,7 +867,7 @@ fn converge( sleep(Duration::new(1, 0)); } let leader = spy_ref.read().unwrap().leader_data().cloned(); - (v, leader, ncp) + (v, leader, gossip_service) } #[cfg(test)] diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 54cd8de3ff..f586ece803 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -74,7 +74,7 @@ fn main() { let nosigverify = matches.is_present("nosigverify"); let use_only_bootstrap_leader = matches.is_present("no-leader-rotation"); - let (keypair, vote_account_keypair, ncp) = if let Some(i) = matches.value_of("identity") { + let (keypair, vote_account_keypair, gossip) = if let Some(i) = matches.value_of("identity") { let path = i.to_string(); if let Ok(file) = File::open(path.clone()) { let parse: serde_json::Result = serde_json::from_reader(file); @@ -82,7 +82,7 @@ fn main() { ( data.keypair(), data.vote_account_keypair(), - data.node_info.ncp, + data.node_info.gossip, ) } else { eprintln!("failed to parse {}", path); @@ -98,12 +98,12 @@ fn main() { let ledger_path = matches.value_of("ledger").unwrap(); - // socketaddr that is initial pointer into the network's gossip (ncp) + // socketaddr that is initial pointer into the network's gossip let network = matches .value_of("network") .map(|network| network.parse().expect("failed to parse network address")); - let node = Node::new_with_external_ip(keypair.pubkey(), &ncp); + let node = Node::new_with_external_ip(keypair.pubkey(), &gossip); // save off some stuff for airdrop let mut node_info = node.info.clone(); diff --git a/src/bin/replicator.rs b/src/bin/replicator.rs index 459443db20..c95601c6d0 100644 --- a/src/bin/replicator.rs +++ b/src/bin/replicator.rs @@ -59,12 +59,12 @@ fn main() { let ledger_path = matches.value_of("ledger"); - let (keypair, ncp) = if let Some(i) = matches.value_of("identity") { + let (keypair, gossip) = if let Some(i) = matches.value_of("identity") { let path = i.to_string(); if let Ok(file) = File::open(path.clone()) { let parse: serde_json::Result = serde_json::from_reader(file); if let Ok(data) = parse { - (data.keypair(), data.node_info.ncp) + (data.keypair(), data.node_info.gossip) } else { eprintln!("failed to parse {}", path); exit(1); @@ -77,12 +77,12 @@ fn main() { (Keypair::new(), socketaddr!([127, 0, 0, 1], 8700)) }; - let node = Node::new_with_external_ip(keypair.pubkey(), &ncp); + let node = Node::new_with_external_ip(keypair.pubkey(), &gossip); println!( - "replicating the data with keypair: {:?} ncp:{:?}", + "replicating the data with keypair: {:?} gossip:{:?}", keypair.pubkey(), - ncp + gossip ); let exit = Arc::new(AtomicBool::new(false)); diff --git a/src/broadcast_stage.rs b/src/broadcast_service.rs similarity index 90% rename from src/broadcast_stage.rs rename to src/broadcast_service.rs index 010fd58f3b..ab17c92d2f 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_service.rs @@ -1,4 +1,4 @@ -//! The `broadcast_stage` broadcasts data from a leader node to validators +//! The `broadcast_service` broadcasts data from a leader node to validators //! use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; use counter::Counter; @@ -24,7 +24,7 @@ use std::time::{Duration, Instant}; use window::{SharedWindow, WindowIndex, WindowUtil}; #[derive(Debug, PartialEq, Eq, Clone)] -pub enum BroadcastStageReturnType { +pub enum BroadcastServiceReturnType { LeaderRotation, ChannelDisconnected, } @@ -54,7 +54,7 @@ fn broadcast( num_entries += entries.len(); ventries.push(entries); } - inc_new_counter_info!("broadcast_stage-entries_received", num_entries); + inc_new_counter_info!("broadcast_service-entries_received", num_entries); let to_blobs_start = Instant::now(); let num_ticks: u64 = ventries @@ -154,7 +154,7 @@ fn broadcast( let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); inc_new_counter_info!( - "broadcast_stage-time_ms", + "broadcast_service-time_ms", duration_as_ms(&now.elapsed()) as usize ); info!( @@ -163,7 +163,7 @@ fn broadcast( ); submit( - influxdb::Point::new("broadcast-stage") + influxdb::Point::new("broadcast-service") .add_field( "transmit-index", influxdb::Value::Integer(transmit_index.data as i64), @@ -173,7 +173,7 @@ fn broadcast( Ok(()) } -// Implement a destructor for the BroadcastStage thread to signal it exited +// Implement a destructor for the BroadcastService3 thread to signal it exited // even on panics struct Finalizer { exit_sender: Arc, @@ -191,11 +191,11 @@ impl Drop for Finalizer { } } -pub struct BroadcastStage { - thread_hdl: JoinHandle, +pub struct BroadcastService { + thread_hdl: JoinHandle, } -impl BroadcastStage { +impl BroadcastService { fn run( sock: &UdpSocket, cluster_info: &Arc>, @@ -205,7 +205,7 @@ impl BroadcastStage { receiver: &Receiver>, max_tick_height: Option, tick_height: u64, - ) -> BroadcastStageReturnType { + ) -> BroadcastServiceReturnType { let mut transmit_index = WindowIndex { data: entry_height, coding: entry_height, @@ -231,7 +231,7 @@ impl BroadcastStage { ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - return BroadcastStageReturnType::ChannelDisconnected + return BroadcastServiceReturnType::ChannelDisconnected } Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? @@ -252,11 +252,11 @@ impl BroadcastStage { /// * `cluster_info` - ClusterInfo structure /// * `window` - Cache of blobs that we have broadcast /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. - /// * `exit_sender` - Set to true when this stage exits, allows rest of Tpu to exit cleanly. Otherwise, - /// when a Tpu stage closes, it only closes the stages that come after it. The stages + /// * `exit_sender` - Set to true when this service exits, allows rest of Tpu to exit cleanly. + /// Otherwise, when a Tpu closes, it only closes the stages that come after it. The stages /// that come before could be blocked on a receive, and never notice that they need to /// exit. Now, if any stage of the Tpu closes, it will lead to closing the WriteStage (b/c - /// WriteStage is the last stage in the pipeline), which will then close Broadcast stage, + /// WriteStage is the last stage in the pipeline), which will then close Broadcast service, /// which will then close FetchStage in the Tpu, and then the rest of the Tpu, /// completing the cycle. pub fn new( @@ -286,14 +286,14 @@ impl BroadcastStage { ) }).unwrap(); - BroadcastStage { thread_hdl } + Self { thread_hdl } } } -impl Service for BroadcastStage { - type JoinReturnType = BroadcastStageReturnType; +impl Service for BroadcastService { + type JoinReturnType = BroadcastServiceReturnType; - fn join(self) -> thread::Result { + fn join(self) -> thread::Result { self.thread_hdl.join() } } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 1dfa0ea63f..30fab4d2ff 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -191,10 +191,10 @@ impl ClusterInfo { .into_iter() .map(|node| { format!( - " ncp: {:20} | {}{}\n \ + " gossip: {:20} | {}{}\n \ tpu: {:20} |\n \ rpc: {:20} |\n", - node.ncp.to_string(), + node.gossip.to_string(), node.id, if node.id == leader_id { " <==== leader" @@ -231,7 +231,7 @@ impl ClusterInfo { self.gossip.purge(now); } pub fn convergence(&self) -> usize { - self.ncp_peers().len() + 1 + self.gossip_peers().len() + 1 } pub fn rpc_peers(&self) -> Vec { let me = self.my_data().id; @@ -246,7 +246,7 @@ impl ClusterInfo { .collect() } - pub fn ncp_peers(&self) -> Vec { + pub fn gossip_peers(&self) -> Vec { let me = self.my_data().id; self.gossip .crds @@ -254,7 +254,7 @@ impl ClusterInfo { .values() .filter_map(|x| x.value.contact_info()) .filter(|x| x.id != me) - .filter(|x| ContactInfo::is_valid_address(&x.ncp)) + .filter(|x| ContactInfo::is_valid_address(&x.gossip)) .cloned() .collect() } @@ -508,12 +508,12 @@ impl ClusterInfo { pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication, as indicated // by a valid tvu port location - let valid: Vec<_> = self.ncp_peers(); + let valid: Vec<_> = self.gossip_peers(); if valid.is_empty() { Err(ClusterInfoError::NoPeers)?; } let n = thread_rng().gen::() % valid.len(); - let addr = valid[n].ncp; // send the request to the peer's gossip port + let addr = valid[n].gossip; // send the request to the peer's gossip port let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix); let out = serialize(&req)?; Ok((addr, out)) @@ -530,12 +530,12 @@ impl ClusterInfo { .crds .lookup(&peer_label) .and_then(|v| v.contact_info()) - .map(|peer_info| (peer, filter, peer_info.ncp, self_info)) + .map(|peer_info| (peer, filter, peer_info.gossip, self_info)) }).collect(); pr.into_iter() - .map(|(peer, filter, ncp, self_info)| { + .map(|(peer, filter, gossip, self_info)| { self.gossip.mark_pull_request_creation_time(peer, now); - (ncp, Protocol::PullRequest(filter, self_info)) + (gossip, Protocol::PullRequest(filter, self_info)) }).collect() } fn new_push_requests(&mut self) -> Vec<(SocketAddr, Protocol)> { @@ -549,7 +549,7 @@ impl ClusterInfo { .crds .lookup(&peer_label) .and_then(|v| v.contact_info()) - .map(|p| p.ncp) + .map(|p| p.gossip) }).map(|peer| (peer, Protocol::PushMessage(self_id, msgs.clone()))) .collect() } @@ -760,12 +760,12 @@ impl ClusterInfo { // the remote side may not know his public IP:PORT, record what he looks like to us // this may or may not be correct for everybody but it's better than leaving him with // an unspecified address in our table - if from.ncp.ip().is_unspecified() { - inc_new_counter_info!("cluster_info-window-request-updates-unspec-ncp", 1); - from.ncp = *from_addr; + if from.gossip.ip().is_unspecified() { + inc_new_counter_info!("cluster_info-window-request-updates-unspec-gossip", 1); + from.gossip = *from_addr; } inc_new_counter_info!("cluster_info-pull_request-rsp", len); - to_blob(rsp, from.ncp).ok().into_iter().collect() + to_blob(rsp, from.gossip).ok().into_iter().collect() } } fn handle_pull_response(me: &Arc>, from: Pubkey, data: Vec) { @@ -810,7 +810,7 @@ impl ClusterInfo { }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); - to_blob(rsp, ci.ncp).ok() + to_blob(rsp, ci.gossip).ok() }).into_iter() .collect(); let mut blobs: Vec<_> = pushes @@ -1056,13 +1056,16 @@ impl Node { }, } } - pub fn new_with_external_ip(pubkey: Pubkey, ncp: &SocketAddr) -> Node { + pub fn new_with_external_ip(pubkey: Pubkey, gossip_addr: &SocketAddr) -> Node { fn bind() -> (u16, UdpSocket) { bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind") }; - let (gossip_port, gossip) = if ncp.port() != 0 { - (ncp.port(), bind_to(ncp.port(), false).expect("ncp bind")) + let (gossip_port, gossip) = if gossip_addr.port() != 0 { + ( + gossip_addr.port(), + bind_to(gossip_addr.port(), false).expect("gossip_addr bind"), + ) } else { bind() }; @@ -1080,12 +1083,12 @@ impl Node { let info = NodeInfo::new( pubkey, - SocketAddr::new(ncp.ip(), gossip_port), - SocketAddr::new(ncp.ip(), replicate_port), - SocketAddr::new(ncp.ip(), transaction_port), - SocketAddr::new(ncp.ip(), storage_port), - SocketAddr::new(ncp.ip(), RPC_PORT), - SocketAddr::new(ncp.ip(), RPC_PORT + 1), + SocketAddr::new(gossip_addr.ip(), gossip_port), + SocketAddr::new(gossip_addr.ip(), replicate_port), + SocketAddr::new(gossip_addr.ip(), transaction_port), + SocketAddr::new(gossip_addr.ip(), storage_port), + SocketAddr::new(gossip_addr.ip(), RPC_PORT), + SocketAddr::new(gossip_addr.ip(), RPC_PORT + 1), 0, ); trace!("new NodeInfo: {:?}", info); @@ -1171,10 +1174,10 @@ mod tests { let rv = cluster_info.window_index_request(0); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); - let ncp = socketaddr!([127, 0, 0, 1], 1234); + let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); let nxt = NodeInfo::new( Keypair::new().pubkey(), - ncp, + gossip_addr, socketaddr!([127, 0, 0, 1], 1235), socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1237), @@ -1184,13 +1187,13 @@ mod tests { ); cluster_info.insert_info(nxt.clone()); let rv = cluster_info.window_index_request(0).unwrap(); - assert_eq!(nxt.ncp, ncp); - assert_eq!(rv.0, nxt.ncp); + assert_eq!(nxt.gossip, gossip_addr); + assert_eq!(rv.0, nxt.gossip); - let ncp2 = socketaddr!([127, 0, 0, 2], 1234); + let gossip_addr2 = socketaddr!([127, 0, 0, 2], 1234); let nxt = NodeInfo::new( Keypair::new().pubkey(), - ncp2, + gossip_addr2, socketaddr!([127, 0, 0, 1], 1235), socketaddr!([127, 0, 0, 1], 1236), socketaddr!([127, 0, 0, 1], 1237), @@ -1204,10 +1207,10 @@ mod tests { while !one || !two { //this randomly picks an option, so eventually it should pick both let rv = cluster_info.window_index_request(0).unwrap(); - if rv.0 == ncp { + if rv.0 == gossip_addr { one = true; } - if rv.0 == ncp2 { + if rv.0 == gossip_addr2 { two = true; } } diff --git a/src/contact_info.rs b/src/contact_info.rs index c55d961bb9..889aae330b 100644 --- a/src/contact_info.rs +++ b/src/contact_info.rs @@ -12,7 +12,7 @@ pub struct ContactInfo { /// signature of this ContactInfo pub signature: Signature, /// gossip address - pub ncp: SocketAddr, + pub gossip: SocketAddr, /// address to connect to for replication pub tvu: SocketAddr, /// transactions address @@ -48,7 +48,7 @@ impl Default for ContactInfo { fn default() -> Self { ContactInfo { id: Pubkey::default(), - ncp: socketaddr_any!(), + gossip: socketaddr_any!(), tvu: socketaddr_any!(), tpu: socketaddr_any!(), storage_addr: socketaddr_any!(), @@ -63,7 +63,7 @@ impl Default for ContactInfo { impl ContactInfo { pub fn new( id: Pubkey, - ncp: SocketAddr, + gossip: SocketAddr, tvu: SocketAddr, tpu: SocketAddr, storage_addr: SocketAddr, @@ -74,7 +74,7 @@ impl ContactInfo { ContactInfo { id, signature: Signature::default(), - ncp, + gossip, tvu, tpu, storage_addr, @@ -175,7 +175,7 @@ impl Signable for ContactInfo { #[derive(Serialize)] struct SignData { id: Pubkey, - ncp: SocketAddr, + gossip: SocketAddr, tvu: SocketAddr, tpu: SocketAddr, storage_addr: SocketAddr, @@ -187,7 +187,7 @@ impl Signable for ContactInfo { let me = self; let data = SignData { id: me.id, - ncp: me.ncp, + gossip: me.gossip, tvu: me.tvu, tpu: me.tpu, storage_addr: me.storage_addr, @@ -227,7 +227,7 @@ mod tests { #[test] fn test_default() { let ci = ContactInfo::default(); - assert!(ci.ncp.ip().is_unspecified()); + assert!(ci.gossip.ip().is_unspecified()); assert!(ci.tvu.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); @@ -237,7 +237,7 @@ mod tests { #[test] fn test_multicast() { let ci = ContactInfo::new_multicast(); - assert!(ci.ncp.ip().is_multicast()); + assert!(ci.gossip.ip().is_multicast()); assert!(ci.tvu.ip().is_multicast()); assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); @@ -248,7 +248,7 @@ mod tests { fn test_entry_point() { let addr = socketaddr!("127.0.0.1:10"); let ci = ContactInfo::new_entry_point(&addr); - assert_eq!(ci.ncp, addr); + assert_eq!(ci.gossip, addr); assert!(ci.tvu.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); @@ -260,7 +260,7 @@ mod tests { let addr = socketaddr!("127.0.0.1:10"); let ci = ContactInfo::new_with_socketaddr(&addr); assert_eq!(ci.tpu, addr); - assert_eq!(ci.ncp.port(), 11); + assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); assert_eq!(ci.rpc.port(), 8899); assert_eq!(ci.rpc_pubsub.port(), 8900); @@ -274,7 +274,7 @@ mod tests { &socketaddr!("127.0.0.1:1234"), ); assert_eq!(d1.id, keypair.pubkey()); - assert_eq!(d1.ncp, socketaddr!("127.0.0.1:1235")); + assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); diff --git a/src/crds_gossip_pull.rs b/src/crds_gossip_pull.rs index a022691740..31debf5d0d 100644 --- a/src/crds_gossip_pull.rs +++ b/src/crds_gossip_pull.rs @@ -59,7 +59,7 @@ impl CrdsGossipPull { .values() .filter_map(|v| v.value.contact_info()) .filter(|v| { - v.id != self_id && !v.ncp.ip().is_unspecified() && !v.ncp.ip().is_multicast() + v.id != self_id && !v.gossip.ip().is_unspecified() && !v.gossip.ip().is_multicast() }).map(|item| { let req_time: u64 = *self.pull_request_time.get(&item.id).unwrap_or(&0); let weight = cmp::max( diff --git a/src/crds_gossip_push.rs b/src/crds_gossip_push.rs index 94b8409603..dfc9c565eb 100644 --- a/src/crds_gossip_push.rs +++ b/src/crds_gossip_push.rs @@ -184,7 +184,7 @@ impl CrdsGossipPush { continue; } if let Some(contact) = val.1.value.contact_info() { - if !ContactInfo::is_valid_address(&contact.ncp) { + if !ContactInfo::is_valid_address(&contact.gossip) { continue; } } diff --git a/src/fullnode.rs b/src/fullnode.rs index de06ec4ebc..03c50bcf4d 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,12 +1,12 @@ //! The `fullnode` module hosts all the fullnode microservices. use bank::Bank; -use broadcast_stage::BroadcastStage; +use broadcast_service::BroadcastService; use cluster_info::{ClusterInfo, Node, NodeInfo}; use db_ledger::{write_entries_to_ledger, DbLedger}; +use gossip_service::GossipService; use leader_scheduler::LeaderScheduler; use ledger::read_ledger; -use ncp::Ncp; use rpc::JsonRpcService; use rpc_pubsub::PubSubService; use service::Service; @@ -31,19 +31,19 @@ pub enum NodeRole { pub struct LeaderServices { tpu: Tpu, - broadcast_stage: BroadcastStage, + broadcast_service: BroadcastService, } impl LeaderServices { - fn new(tpu: Tpu, broadcast_stage: BroadcastStage) -> Self { + fn new(tpu: Tpu, broadcast_service: BroadcastService) -> Self { LeaderServices { tpu, - broadcast_stage, + broadcast_service, } } pub fn join(self) -> Result> { - self.broadcast_stage.join()?; + self.broadcast_service.join()?; self.tpu.join() } @@ -94,7 +94,7 @@ pub struct Fullnode { exit: Arc, rpc_service: Option, rpc_pubsub_service: Option, - ncp: Ncp, + gossip_service: GossipService, bank: Arc, cluster_info: Arc>, ledger_path: String, @@ -164,7 +164,7 @@ impl Fullnode { info!( "starting... local gossip address: {} (advertising {})", - local_gossip_addr, node.info.ncp + local_gossip_addr, node.info.gossip ); let mut rpc_addr = node.info.rpc; if let Some(port) = rpc_port { @@ -240,7 +240,7 @@ impl Fullnode { let (rpc_service, rpc_pubsub_service) = Self::startup_rpc_services(rpc_addr, rpc_pubsub_addr, &bank, &cluster_info); - let ncp = Ncp::new( + let gossip_service = GossipService::new( &cluster_info, shared_window.clone(), Some(ledger_path), @@ -324,7 +324,7 @@ impl Fullnode { last_entry_id, ); - let broadcast_stage = BroadcastStage::new( + let broadcast_service = BroadcastService::new( node.sockets .broadcast .try_clone() @@ -338,7 +338,7 @@ impl Fullnode { bank.tick_height(), tpu_exit, ); - let leader_state = LeaderServices::new(tpu, broadcast_stage); + let leader_state = LeaderServices::new(tpu, broadcast_service); Some(NodeRole::Leader(leader_state)) }; @@ -349,7 +349,7 @@ impl Fullnode { shared_window, bank, sigverify_disabled, - ncp, + gossip_service, rpc_service: Some(rpc_service), rpc_pubsub_service: Some(rpc_pubsub_service), node_role, @@ -488,7 +488,7 @@ impl Fullnode { &last_id, ); - let broadcast_stage = BroadcastStage::new( + let broadcast_service = BroadcastService::new( self.broadcast_socket .try_clone() .expect("Failed to clone broadcast socket"), @@ -501,7 +501,7 @@ impl Fullnode { tick_height, tpu_exit, ); - let leader_state = LeaderServices::new(tpu, broadcast_stage); + let leader_state = LeaderServices::new(tpu, broadcast_service); self.node_role = Some(NodeRole::Leader(leader_state)); } @@ -626,7 +626,7 @@ impl Service for Fullnode { rpc_pubsub_service.join()?; } - self.ncp.join()?; + self.gossip_service.join()?; match self.node_role { Some(NodeRole::Validator(validator_service)) => { @@ -796,7 +796,7 @@ mod tests { &bootstrap_leader_ledger_path, Arc::new(bootstrap_leader_keypair), Arc::new(Keypair::new()), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -895,7 +895,7 @@ mod tests { &bootstrap_leader_ledger_path, bootstrap_leader_keypair, leader_vote_account_keypair, - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -914,7 +914,7 @@ mod tests { &validator_ledger_path, Arc::new(validator_keypair), Arc::new(validator_vote_account_keypair), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -942,7 +942,7 @@ mod tests { let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_id = leader_node.info.id; - let leader_ncp = leader_node.info.ncp; + let leader_gossip = leader_node.info.gossip; // Create validator identity let num_ending_ticks = 1; @@ -1001,7 +1001,7 @@ mod tests { &validator_ledger_path, Arc::new(validator_keypair), Arc::new(validator_vote_account_keypair), - Some(leader_ncp), + Some(leader_gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, diff --git a/src/ncp.rs b/src/gossip_service.rs similarity index 83% rename from src/ncp.rs rename to src/gossip_service.rs index 738534d251..726ef4748b 100644 --- a/src/ncp.rs +++ b/src/gossip_service.rs @@ -1,4 +1,4 @@ -//! The `ncp` module implements the network control plane. +//! The `gossip_service` module implements the network control plane. use cluster_info::ClusterInfo; use service::Service; @@ -10,12 +10,12 @@ use std::thread::{self, JoinHandle}; use streamer; use window::SharedWindow; -pub struct Ncp { +pub struct GossipService { exit: Arc, thread_hdls: Vec>, } -impl Ncp { +impl GossipService { pub fn new( cluster_info: &Arc>, window: SharedWindow, @@ -26,14 +26,14 @@ impl Ncp { let (request_sender, request_receiver) = channel(); let gossip_socket = Arc::new(gossip_socket); trace!( - "Ncp: id: {}, listening on: {:?}", + "GossipService: id: {}, listening on: {:?}", &cluster_info.read().unwrap().my_data().id, gossip_socket.local_addr().unwrap() ); let t_receiver = streamer::blob_receiver(gossip_socket.clone(), exit.clone(), request_sender); let (response_sender, response_receiver) = channel(); - let t_responder = streamer::responder("ncp", gossip_socket, response_receiver); + let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); let t_listen = ClusterInfo::listen( cluster_info.clone(), window, @@ -44,7 +44,7 @@ impl Ncp { ); let t_gossip = ClusterInfo::gossip(cluster_info.clone(), response_sender, exit.clone()); let thread_hdls = vec![t_receiver, t_responder, t_listen, t_gossip]; - Ncp { exit, thread_hdls } + Self { exit, thread_hdls } } pub fn close(self) -> thread::Result<()> { @@ -53,7 +53,7 @@ impl Ncp { } } -impl Service for Ncp { +impl Service for GossipService { type JoinReturnType = (); fn join(self) -> thread::Result<()> { @@ -66,8 +66,8 @@ impl Service for Ncp { #[cfg(test)] mod tests { + use super::*; use cluster_info::{ClusterInfo, Node}; - use ncp::Ncp; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -80,7 +80,7 @@ mod tests { let cluster_info = ClusterInfo::new(tn.info.clone()); let c = Arc::new(RwLock::new(cluster_info)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new(&c, w, None, tn.sockets.gossip, exit.clone()); + let d = GossipService::new(&c, w, None, tn.sockets.gossip, exit.clone()); d.close().expect("thread join"); } } diff --git a/src/lib.rs b/src/lib.rs index d0377e4cfa..e18d59221a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ pub mod bank; pub mod banking_stage; pub mod blob_fetch_stage; pub mod bloom; -pub mod broadcast_stage; +pub mod broadcast_service; #[cfg(feature = "chacha")] pub mod chacha; #[cfg(all(feature = "chacha", feature = "cuda"))] @@ -38,12 +38,12 @@ pub mod entry; pub mod erasure; pub mod fetch_stage; pub mod fullnode; +pub mod gossip_service; pub mod leader_scheduler; pub mod ledger; pub mod ledger_write_stage; pub mod logger; pub mod mint; -pub mod ncp; pub mod netutil; pub mod packet; pub mod poh; diff --git a/src/replicator.rs b/src/replicator.rs index f1f00ec65c..73c5c64e26 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -1,8 +1,8 @@ use blob_fetch_stage::BlobFetchStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; use db_ledger::DbLedger; +use gossip_service::GossipService; use leader_scheduler::LeaderScheduler; -use ncp::Ncp; use service::Service; use solana_sdk::hash::{Hash, Hasher}; use std::fs::File; @@ -28,7 +28,7 @@ use window; use window_service::window_service; pub struct Replicator { - ncp: Ncp, + gossip_service: GossipService, fetch_stage: BlobFetchStage, store_ledger_stage: StoreLedgerStage, t_window: JoinHandle<()>, @@ -134,7 +134,7 @@ impl Replicator { let store_ledger_stage = StoreLedgerStage::new(entry_window_receiver, ledger_path); - let ncp = Ncp::new( + let gossip_service = GossipService::new( &cluster_info, shared_window.clone(), ledger_path, @@ -147,7 +147,7 @@ impl Replicator { ( Replicator { - ncp, + gossip_service, fetch_stage, store_ledger_stage, t_window, @@ -158,7 +158,7 @@ impl Replicator { } pub fn join(self) { - self.ncp.join().unwrap(); + self.gossip_service.join().unwrap(); self.fetch_stage.join().unwrap(); self.t_window.join().unwrap(); self.store_ledger_stage.join().unwrap(); diff --git a/src/thin_client.rs b/src/thin_client.rs index 892b53f6ce..568eef29f0 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -7,8 +7,8 @@ use bank::Bank; use bincode::serialize; use bs58; use cluster_info::{ClusterInfo, ClusterInfoError, NodeInfo}; +use gossip_service::GossipService; use log::Level; -use ncp::Ncp; use packet::PACKET_DATA_SIZE; use result::{Error, Result}; use rpc_request::{RpcClient, RpcRequest}; @@ -347,7 +347,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R let my_addr = gossip_socket.local_addr().unwrap(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node))); let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new( + let gossip_service = GossipService::new( &cluster_info.clone(), window, None, @@ -390,7 +390,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R sleep(Duration::from_millis(100)); } - ncp.close()?; + gossip_service.close()?; if log_enabled!(Level::Trace) { trace!("{}", cluster_info.read().unwrap().node_info_trace()); diff --git a/src/tpu_forwarder.rs b/src/tpu_forwarder.rs index 9c552046a2..8a110645d3 100644 --- a/src/tpu_forwarder.rs +++ b/src/tpu_forwarder.rs @@ -160,7 +160,7 @@ mod tests { sleep(Duration::from_millis(100)); let mut data = vec![0u8; 64]; - // should be nothing on any socket ncp + // should be nothing on any gossip socket assert!(nodes[0].0.recv_from(&mut data).is_err()); assert!(nodes[1].0.recv_from(&mut data).is_err()); assert!(nodes[2].0.recv_from(&mut data).is_err()); diff --git a/src/tvu.rs b/src/tvu.rs index 16cef32019..2602d7003f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -169,11 +169,11 @@ pub mod tests { use cluster_info::{ClusterInfo, Node}; use db_ledger::DbLedger; use entry::Entry; + use gossip_service::GossipService; use leader_scheduler::LeaderScheduler; use ledger::get_tmp_ledger_path; use logger; use mint::Mint; - use ncp::Ncp; use packet::SharedBlob; use rocksdb::{Options, DB}; use service::Service; @@ -195,10 +195,10 @@ pub mod tests { cluster_info: Arc>, gossip: UdpSocket, exit: Arc, - ) -> (Ncp, SharedWindow) { + ) -> (GossipService, SharedWindow) { let window = Arc::new(RwLock::new(window::default_window())); - let ncp = Ncp::new(&cluster_info, window.clone(), None, gossip, exit); - (ncp, window) + let gossip_service = GossipService::new(&cluster_info, window.clone(), None, gossip, exit); + (gossip_service, window) } /// Test that message sent from leader to target1 and replicated to target2 diff --git a/src/wallet.rs b/src/wallet.rs index e7fa56909b..5fbc907bf4 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1128,7 +1128,7 @@ mod tests { let drone_addr = receiver.recv().unwrap(); let mut config = WalletConfig::default(); - config.network = leader_data.ncp; + config.network = leader_data.gossip; config.drone_port = Some(drone_addr.port()); let tokens = 50; @@ -1199,7 +1199,7 @@ mod tests { let drone_addr = receiver.recv().unwrap(); let mut bob_config = WalletConfig::default(); - bob_config.network = leader_data.ncp; + bob_config.network = leader_data.gossip; bob_config.drone_port = Some(drone_addr.port()); bob_config.command = WalletCommand::AirDrop(50); @@ -1282,11 +1282,11 @@ mod tests { let rpc_client = RpcClient::new_from_socket(leader_data.rpc); let mut config_payer = WalletConfig::default(); - config_payer.network = leader_data.ncp; + config_payer.network = leader_data.gossip; config_payer.drone_port = Some(drone_addr.port()); let mut config_witness = WalletConfig::default(); - config_witness.network = leader_data.ncp; + config_witness.network = leader_data.gossip; config_witness.drone_port = Some(drone_addr.port()); assert_ne!(config_payer.id.pubkey(), config_witness.id.pubkey()); @@ -1527,11 +1527,11 @@ mod tests { let rpc_client = RpcClient::new_from_socket(leader_data.rpc); let mut config_payer = WalletConfig::default(); - config_payer.network = leader_data.ncp; + config_payer.network = leader_data.gossip; config_payer.drone_port = Some(drone_addr.port()); let mut config_witness = WalletConfig::default(); - config_witness.network = leader_data.ncp; + config_witness.network = leader_data.gossip; config_witness.drone_port = Some(drone_addr.port()); assert_ne!(config_payer.id.pubkey(), config_witness.id.pubkey()); diff --git a/src/window_service.rs b/src/window_service.rs index d4ec1379d0..aa62624d75 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -298,7 +298,7 @@ mod test { let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut num_blobs_to_make = 10; - let gossip_address = &tn.info.ncp; + let gossip_address = &tn.info.gossip; let msgs = make_consecutive_blobs( me_id, num_blobs_to_make, @@ -375,7 +375,7 @@ mod test { w.set_id(&me_id).unwrap(); assert_eq!(i, w.index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.ncp); + w.meta.set_addr(&tn.info.gossip); } msgs.push(b); } @@ -392,7 +392,7 @@ mod test { w.set_id(&me_id).unwrap(); assert_eq!(i, w.index().unwrap()); w.meta.size = PACKET_DATA_SIZE; - w.meta.set_addr(&tn.info.ncp); + w.meta.set_addr(&tn.info.gossip); } msgs1.push(b); } diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 5472491f25..794358b7fd 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -6,8 +6,8 @@ extern crate solana_sdk; use rayon::iter::*; use solana::cluster_info::{ClusterInfo, Node}; +use solana::gossip_service::GossipService; use solana::logger; -use solana::ncp::Ncp; use solana::packet::{Blob, SharedBlob}; use solana::result; use solana::service::Service; @@ -19,13 +19,13 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::time::Duration; -fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { +fn test_node(exit: Arc) -> (Arc>, GossipService, UdpSocket) { let keypair = Keypair::new(); let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); let c = Arc::new(RwLock::new(cluster_info)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); + let d = GossipService::new(&c.clone(), w, None, tn.sockets.gossip, exit); let _ = c.read().unwrap().my_data(); (c, d, tn.sockets.replicate.pop().unwrap()) } @@ -36,7 +36,7 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket /// tests that actually use this function are below fn run_gossip_topo(num: usize, topo: F) where - F: Fn(&Vec<(Arc>, Ncp, UdpSocket)>) -> (), + F: Fn(&Vec<(Arc>, GossipService, UdpSocket)>) -> (), { let exit = Arc::new(AtomicBool::new(false)); let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect(); @@ -46,7 +46,7 @@ where done = true; let total: usize = listen .iter() - .map(|v| v.0.read().unwrap().ncp_peers().len()) + .map(|v| v.0.read().unwrap().gossip_peers().len()) .sum(); if (total + num) * 10 > num * num * 9 { done = true; @@ -165,9 +165,9 @@ pub fn cluster_info_retransmit() -> result::Result<()> { trace!("waiting to converge:"); let mut done = false; for _ in 0..30 { - done = c1.read().unwrap().ncp_peers().len() == num - 1 - && c2.read().unwrap().ncp_peers().len() == num - 1 - && c3.read().unwrap().ncp_peers().len() == num - 1; + done = c1.read().unwrap().gossip_peers().len() == num - 1 + && c2.read().unwrap().gossip_peers().len() == num - 1 + && c3.read().unwrap().gossip_peers().len() == num - 1; if done { break; } diff --git a/tests/multinode.rs b/tests/multinode.rs index 74ae809c57..6f5a57a1ff 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -12,6 +12,7 @@ use solana::contact_info::ContactInfo; use solana::db_ledger::DbLedger; use solana::entry::{reconstruct_entries_from_blobs, Entry}; use solana::fullnode::{Fullnode, FullnodeReturnType}; +use solana::gossip_service::GossipService; use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::ledger::{ create_tmp_genesis, create_tmp_sample_ledger, read_ledger, tmp_copy_ledger, LedgerWindow, @@ -19,7 +20,6 @@ use solana::ledger::{ }; use solana::logger; use solana::mint::Mint; -use solana::ncp::Ncp; use solana::packet::SharedBlob; use solana::poh_service::NUM_TICKS_PER_SECOND; use solana::result; @@ -41,7 +41,7 @@ use std::sync::{Arc, RwLock}; use std::thread::{sleep, Builder, JoinHandle}; use std::time::{Duration, Instant}; -fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { +fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, Pubkey) { let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); let mut spy = Node::new_localhost_with_pubkey(keypair.pubkey()); @@ -53,7 +53,7 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { spy_cluster_info.set_leader(leader.id); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); let spy_window = Arc::new(RwLock::new(default_window())); - let ncp = Ncp::new( + let gossip_service = GossipService::new( &spy_cluster_info_ref, spy_window, None, @@ -61,10 +61,12 @@ fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { exit.clone(), ); - (ncp, spy_cluster_info_ref, me) + (gossip_service, spy_cluster_info_ref, me) } -fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc>, Node, Pubkey) { +fn make_listening_node( + leader: &NodeInfo, +) -> (GossipService, Arc>, Node, Pubkey) { let keypair = Keypair::new(); let exit = Arc::new(AtomicBool::new(false)); let new_node = Node::new_localhost_with_pubkey(keypair.pubkey()); @@ -75,7 +77,7 @@ fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc>, Nod new_node_cluster_info.set_leader(leader.id); let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); let new_node_window = Arc::new(RwLock::new(default_window())); - let ncp = Ncp::new( + let gossip_service = GossipService::new( &new_node_cluster_info_ref, new_node_window, None, @@ -87,12 +89,12 @@ fn make_listening_node(leader: &NodeInfo) -> (Ncp, Arc>, Nod exit.clone(), ); - (ncp, new_node_cluster_info_ref, new_node, me) + (gossip_service, new_node_cluster_info_ref, new_node, me) } fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { //lets spy on the network - let (ncp, spy_ref, _) = make_spy_node(leader); + let (gossip_service, spy_ref, _) = make_spy_node(leader); //wait for the network to converge let mut converged = false; @@ -108,7 +110,7 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { sleep(Duration::new(1, 0)); } assert!(converged); - ncp.close().unwrap(); + gossip_service.close().unwrap(); rv } @@ -170,7 +172,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { &zero_ledger_path, keypair, Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, @@ -275,7 +277,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { &ledger_path, keypair, Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, @@ -313,7 +315,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { &zero_ledger_path, keypair, Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, @@ -407,7 +409,7 @@ fn test_multi_node_basic() { &ledger_path, keypair, Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, @@ -483,7 +485,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { &ledger_path, keypair, Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, @@ -571,7 +573,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { &stale_ledger_path, keypair, Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), false, LeaderScheduler::from_bootstrap_leader(leader_data.id), None, @@ -702,7 +704,7 @@ fn test_multi_node_dynamic_network() { &ledger_path, Arc::new(keypair), Arc::new(Keypair::new()), - Some(leader_data.ncp), + Some(leader_data.gossip), true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), None, @@ -848,7 +850,7 @@ fn test_leader_to_validator_transition() { &leader_ledger_path, leader_keypair, Arc::new(Keypair::new()), - Some(leader_info.ncp), + Some(leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -856,7 +858,7 @@ fn test_leader_to_validator_transition() { // Make an extra node for our leader to broadcast to, // who won't vote and mess with our leader's entry count - let (ncp, spy_node, _) = make_spy_node(&leader_info); + let (gossip_service, spy_node, _) = make_spy_node(&leader_info); // Wait for the leader to see the spy node let mut converged = false; @@ -921,7 +923,7 @@ fn test_leader_to_validator_transition() { assert_eq!(bank.tick_height(), bootstrap_height); // Shut down - ncp.close().unwrap(); + gossip_service.close().unwrap(); leader.close().unwrap(); remove_dir_all(leader_ledger_path).unwrap(); } @@ -988,7 +990,7 @@ fn test_leader_validator_basic() { &validator_ledger_path, validator_keypair, Arc::new(vote_account_keypair), - Some(leader_info.ncp), + Some(leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1000,7 +1002,7 @@ fn test_leader_validator_basic() { &leader_ledger_path, leader_keypair, Arc::new(Keypair::new()), - Some(leader_info.ncp), + Some(leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1177,7 +1179,7 @@ fn test_dropped_handoff_recovery() { &bootstrap_leader_ledger_path, bootstrap_leader_keypair, Arc::new(Keypair::new()), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1200,7 +1202,7 @@ fn test_dropped_handoff_recovery() { &validator_ledger_path, kp, Arc::new(Keypair::new()), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1226,7 +1228,7 @@ fn test_dropped_handoff_recovery() { &next_leader_ledger_path, next_leader_keypair, Arc::new(vote_account_keypair), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1364,7 +1366,7 @@ fn test_full_leader_validator_network() { &validator_ledger_path, Arc::new(kp), Arc::new(vote_account_keypairs.pop_front().unwrap()), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1380,7 +1382,7 @@ fn test_full_leader_validator_network() { &bootstrap_leader_ledger_path, Arc::new(leader_keypair), Arc::new(leader_vote_keypair), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None, @@ -1562,7 +1564,7 @@ fn test_broadcast_last_tick() { &bootstrap_leader_ledger_path, Arc::new(bootstrap_leader_keypair), Arc::new(Keypair::new()), - Some(bootstrap_leader_info.ncp), + Some(bootstrap_leader_info.gossip), false, LeaderScheduler::new(&leader_scheduler_config), None,