From a4b6d181a2fe9fd77a5a27fd8886834c5f93f350 Mon Sep 17 00:00:00 2001 From: Carl Date: Mon, 11 Mar 2019 12:46:30 -0700 Subject: [PATCH] rename forwarder ports to tpu_via_blobs --- core/src/banking_stage.rs | 8 ++++---- core/src/cluster_info.rs | 14 +++++++------- core/src/contact_info.rs | 24 ++++++++++++------------ core/src/fetch_stage.rs | 16 ++++++++-------- core/src/fullnode.rs | 2 +- core/src/tpu.rs | 4 ++-- 6 files changed, 34 insertions(+), 34 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 19c3c6b659..3828f7f1fa 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -77,7 +77,7 @@ impl BankingStage { fn forward_unprocessed_packets( socket: &std::net::UdpSocket, - forwarder: &std::net::SocketAddr, + tpu_via_blobs: &std::net::SocketAddr, unprocessed_packets: &[(SharedPackets, usize)], ) -> std::io::Result<()> { let locked_packets: Vec<_> = unprocessed_packets @@ -91,7 +91,7 @@ impl BankingStage { let blobs = packet::packets_to_blobs(&packets); for blob in blobs { - socket.send_to(&blob.data[..blob.meta.size], forwarder)?; + socket.send_to(&blob.data[..blob.meta.size], tpu_via_blobs)?; } Ok(()) @@ -118,7 +118,7 @@ impl BankingStage { if forward { let _ = Self::forward_unprocessed_packets( &socket, - &rcluster_info.leader_data().unwrap().forwarder, + &rcluster_info.leader_data().unwrap().tpu_via_blobs, &buffered_packets, ); } @@ -173,7 +173,7 @@ impl BankingStage { if let Some(leader) = cluster_info.read().unwrap().leader_data() { let _ = Self::forward_unprocessed_packets( &socket, - &leader.forwarder, + &leader.tpu_via_blobs, &unprocessed_packets, ); } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index d4e88cfaa4..5a463b4629 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1341,7 +1341,7 @@ pub struct Sockets { pub gossip: UdpSocket, pub tvu: Vec, pub tpu: Vec, - pub forwarder: Vec, + pub tpu_via_blobs: Vec, pub broadcast: UdpSocket, pub repair: UdpSocket, pub retransmit: UdpSocket, @@ -1362,7 +1362,7 @@ impl Node { let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); - let forwarder = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tpu_via_blobs = UdpSocket::bind("127.0.0.1:0").unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let rpc_port = find_available_port_in_range((1024, 65535)).unwrap(); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_port); @@ -1378,7 +1378,7 @@ impl Node { gossip.local_addr().unwrap(), tvu.local_addr().unwrap(), tpu.local_addr().unwrap(), - forwarder.local_addr().unwrap(), + tpu_via_blobs.local_addr().unwrap(), storage.local_addr().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -1390,7 +1390,7 @@ impl Node { gossip, tvu: vec![tvu], tpu: vec![tpu], - forwarder: vec![forwarder], + tpu_via_blobs: vec![tpu_via_blobs], broadcast, repair, retransmit, @@ -1419,7 +1419,7 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 32).expect("tpu multi_bind"); - let (forwarder_port, forwarder_sockets) = + let (tpu_via_blobs_port, tpu_via_blobs_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tpu multi_bind"); let (_, repair) = bind(); @@ -1432,7 +1432,7 @@ impl Node { SocketAddr::new(gossip_addr.ip(), gossip_port), SocketAddr::new(gossip_addr.ip(), tvu_port), SocketAddr::new(gossip_addr.ip(), tpu_port), - SocketAddr::new(gossip_addr.ip(), forwarder_port), + SocketAddr::new(gossip_addr.ip(), tpu_via_blobs_port), SocketAddr::new(gossip_addr.ip(), storage_port), SocketAddr::new(gossip_addr.ip(), RPC_PORT), SocketAddr::new(gossip_addr.ip(), RPC_PORT + 1), @@ -1446,7 +1446,7 @@ impl Node { gossip, tvu: tvu_sockets, tpu: tpu_sockets, - forwarder: forwarder_sockets, + tpu_via_blobs: tpu_via_blobs_sockets, broadcast, repair, retransmit, diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 5c2cf69844..3550f6212c 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -18,8 +18,8 @@ pub struct ContactInfo { pub tvu: SocketAddr, /// transactions address pub tpu: SocketAddr, - // forwarder address - pub forwarder: SocketAddr, + /// address to forward unprocessed transactions to + pub tpu_via_blobs: SocketAddr, /// storage data address pub storage_addr: SocketAddr, /// address to which to send JSON-RPC requests @@ -74,7 +74,7 @@ impl Default for ContactInfo { gossip: socketaddr_any!(), tvu: socketaddr_any!(), tpu: socketaddr_any!(), - forwarder: socketaddr_any!(), + tpu_via_blobs: socketaddr_any!(), storage_addr: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), @@ -90,7 +90,7 @@ impl ContactInfo { gossip: SocketAddr, tvu: SocketAddr, tpu: SocketAddr, - forwarder: SocketAddr, + tpu_via_blobs: SocketAddr, storage_addr: SocketAddr, rpc: SocketAddr, rpc_pubsub: SocketAddr, @@ -102,7 +102,7 @@ impl ContactInfo { gossip, tvu, tpu, - forwarder, + tpu_via_blobs, storage_addr, rpc, rpc_pubsub, @@ -150,7 +150,7 @@ impl ContactInfo { let tpu_addr = *bind_addr; let gossip_addr = Self::next_port(&bind_addr, 1); let tvu_addr = Self::next_port(&bind_addr, 2); - let forwarder_addr = Self::next_port(&bind_addr, 3); + let tpu_via_blobs_addr = Self::next_port(&bind_addr, 3); let rpc_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), RPC_PORT + 1); Self::new( @@ -158,7 +158,7 @@ impl ContactInfo { gossip_addr, tvu_addr, tpu_addr, - forwarder_addr, + tpu_via_blobs_addr, "0.0.0.0:0".parse().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -261,7 +261,7 @@ mod tests { let ci = ContactInfo::default(); assert!(ci.gossip.ip().is_unspecified()); assert!(ci.tvu.ip().is_unspecified()); - assert!(ci.forwarder.ip().is_unspecified()); + assert!(ci.tpu_via_blobs.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); @@ -272,7 +272,7 @@ mod tests { let ci = ContactInfo::new_multicast(); assert!(ci.gossip.ip().is_multicast()); assert!(ci.tvu.ip().is_multicast()); - assert!(ci.forwarder.ip().is_multicast()); + assert!(ci.tpu_via_blobs.ip().is_multicast()); assert!(ci.rpc.ip().is_multicast()); assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); @@ -284,7 +284,7 @@ mod tests { let ci = ContactInfo::new_gossip_entry_point(&addr); assert_eq!(ci.gossip, addr); assert!(ci.tvu.ip().is_unspecified()); - assert!(ci.forwarder.ip().is_unspecified()); + assert!(ci.tpu_via_blobs.ip().is_unspecified()); assert!(ci.rpc.ip().is_unspecified()); assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); @@ -297,7 +297,7 @@ mod tests { assert_eq!(ci.tpu, addr); assert_eq!(ci.gossip.port(), 11); assert_eq!(ci.tvu.port(), 12); - assert_eq!(ci.forwarder.port(), 13); + assert_eq!(ci.tpu_via_blobs.port(), 13); assert_eq!(ci.rpc.port(), 8899); assert_eq!(ci.rpc_pubsub.port(), 8900); assert!(ci.storage_addr.ip().is_unspecified()); @@ -312,7 +312,7 @@ mod tests { assert_eq!(d1.id, keypair.pubkey()); assert_eq!(d1.gossip, socketaddr!("127.0.0.1:1235")); assert_eq!(d1.tvu, socketaddr!("127.0.0.1:1236")); - assert_eq!(d1.forwarder, socketaddr!("127.0.0.1:1237")); + assert_eq!(d1.tpu_via_blobs, socketaddr!("127.0.0.1:1237")); assert_eq!(d1.tpu, socketaddr!("127.0.0.1:1234")); assert_eq!(d1.rpc, socketaddr!("127.0.0.1:8899")); assert_eq!(d1.rpc_pubsub, socketaddr!("127.0.0.1:8900")); diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index e40588376b..bb7391303e 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -16,29 +16,29 @@ impl FetchStage { #[allow(clippy::new_ret_no_self)] pub fn new( sockets: Vec, - forwarder_sockets: Vec, + tpu_via_blobs_sockets: Vec, exit: &Arc, ) -> (Self, PacketReceiver) { let (sender, receiver) = channel(); ( - Self::new_with_sender(sockets, forwarder_sockets, exit, &sender), + Self::new_with_sender(sockets, tpu_via_blobs_sockets, exit, &sender), receiver, ) } pub fn new_with_sender( sockets: Vec, - forwarder_sockets: Vec, + tpu_via_blobs_sockets: Vec, exit: &Arc, sender: &PacketSender, ) -> Self { let tx_sockets = sockets.into_iter().map(Arc::new).collect(); - let forwarder_sockets = forwarder_sockets.into_iter().map(Arc::new).collect(); - Self::new_multi_socket(tx_sockets, forwarder_sockets, exit, &sender) + let tpu_via_blobs_sockets = tpu_via_blobs_sockets.into_iter().map(Arc::new).collect(); + Self::new_multi_socket(tx_sockets, tpu_via_blobs_sockets, exit, &sender) } fn new_multi_socket( sockets: Vec>, - forwarder_sockets: Vec>, + tpu_via_blobs_sockets: Vec>, exit: &Arc, sender: &PacketSender, ) -> Self { @@ -46,11 +46,11 @@ impl FetchStage { .into_iter() .map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage")); - let forwarder_threads = forwarder_sockets + let tpu_via_blobs_threads = tpu_via_blobs_sockets .into_iter() .map(|socket| streamer::blob_packet_receiver(socket, &exit, sender.clone())); - let thread_hdls: Vec<_> = tpu_threads.chain(forwarder_threads).collect(); + let thread_hdls: Vec<_> = tpu_threads.chain(tpu_via_blobs_threads).collect(); Self { thread_hdls } } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 99ab9d6e99..f3d7fc6329 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -220,7 +220,7 @@ impl Fullnode { &poh_recorder, entry_receiver, node.sockets.tpu, - node.sockets.forwarder, + node.sockets.tpu_via_blobs, node.sockets.broadcast, config.sigverify_disabled, &blocktree, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 49e4494c68..c3011b6845 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -33,7 +33,7 @@ impl Tpu { poh_recorder: &Arc>, entry_receiver: Receiver, transactions_sockets: Vec, - forwarder_sockets: Vec, + tpu_via_blobs_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, @@ -44,7 +44,7 @@ impl Tpu { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( transactions_sockets, - forwarder_sockets, + tpu_via_blobs_sockets, &exit, &packet_sender.clone(), );