diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index df5e25e70..d87430423 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -201,6 +201,7 @@ impl RepairService { blockstore: Arc, exit: Arc, repair_socket: Arc, + ancestor_hashes_socket: Arc, repair_info: RepairInfo, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, @@ -225,11 +226,10 @@ impl RepairService { .unwrap() }; - let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); let ancestor_hashes_service = AncestorHashesService::new( exit, blockstore, - ancestor_hashes_request_socket, + ancestor_hashes_socket, repair_info, ancestor_hashes_replay_update_receiver, ); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 5093bb253..3b5e03045 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -433,6 +433,7 @@ impl RetransmitStage { cluster_info: Arc, retransmit_sockets: Arc>, repair_socket: Arc, + ancestor_hashes_socket: Arc, verified_receiver: Receiver>, exit: Arc, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, @@ -486,6 +487,7 @@ impl RetransmitStage { verified_receiver, retransmit_sender, repair_socket, + ancestor_hashes_socket, exit, repair_info, leader_schedule_cache, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 80c426abd..f04b5300e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -82,6 +82,7 @@ pub struct Sockets { pub repair: UdpSocket, pub retransmit: Vec, pub forwards: Vec, + pub ancestor_hashes_requests: UdpSocket, } #[derive(Default)] @@ -148,11 +149,13 @@ impl Tvu { fetch: fetch_sockets, retransmit: retransmit_sockets, forwards: tvu_forward_sockets, + ancestor_hashes_requests: ancestor_hashes_socket, } = sockets; let (fetch_sender, fetch_receiver) = channel(); let repair_socket = Arc::new(repair_socket); + let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket); let fetch_sockets: Vec> = fetch_sockets.into_iter().map(Arc::new).collect(); let forward_sockets: Vec> = tvu_forward_sockets.into_iter().map(Arc::new).collect(); @@ -187,6 +190,7 @@ impl Tvu { cluster_info.clone(), Arc::new(retransmit_sockets), repair_socket, + ancestor_hashes_socket, verified_receiver, exit.clone(), cluster_slots_update_receiver, @@ -461,6 +465,7 @@ pub mod tests { retransmit: target1.sockets.retransmit_sockets, fetch: target1.sockets.tvu, forwards: target1.sockets.tvu_forwards, + ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests, } }, blockstore, diff --git a/core/src/validator.rs b/core/src/validator.rs index 1628d6225..0d00a782b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -836,6 +836,11 @@ impl Validator { .iter() .map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets")) .collect(), + ancestor_hashes_requests: node + .sockets + .ancestor_hashes_requests + .try_clone() + .expect("Failed to clone ancestor_hashes_requests socket"), }, blockstore.clone(), ledger_signal_receiver, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7f20dc89c..27ed6be99 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -458,6 +458,7 @@ impl WindowService { verified_receiver: CrossbeamReceiver>, retransmit_sender: Sender>, repair_socket: Arc, + ancestor_hashes_socket: Arc, exit: Arc, repair_info: RepairInfo, leader_schedule_cache: Arc, @@ -483,6 +484,7 @@ impl WindowService { blockstore.clone(), exit.clone(), repair_socket, + ancestor_hashes_socket, repair_info, verified_vote_receiver, outstanding_requests.clone(), diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 9c6fc9dd7..4d2399363 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -94,7 +94,7 @@ use { }; pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); -pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 10; // VALIDATOR_PORT_RANGE must be at least this wide +pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 11; // VALIDATOR_PORT_RANGE must be at least this wide /// The Data plane fanout size, also used as the neighborhood size pub const DATA_PLANE_FANOUT: usize = 200; @@ -2742,6 +2742,7 @@ pub struct Sockets { pub repair: UdpSocket, pub retransmit_sockets: Vec, pub serve_repair: UdpSocket, + pub ancestor_hashes_requests: UdpSocket, } #[derive(Debug)] @@ -2775,6 +2776,8 @@ impl Node { let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); + let ancestor_hashes_requests = UdpSocket::bind("0.0.0.0:0").unwrap(); + let info = ContactInfo { id: *pubkey, gossip: gossip_addr, @@ -2804,6 +2807,7 @@ impl Node { repair, retransmit_sockets: vec![retransmit_socket], serve_repair, + ancestor_hashes_requests, }, } } @@ -2845,6 +2849,7 @@ impl Node { let (repair_port, repair) = Self::bind(bind_ip_addr, port_range); let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range); let (_, broadcast) = Self::bind(bind_ip_addr, port_range); + let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap(); @@ -2880,6 +2885,7 @@ impl Node { repair, retransmit_sockets: vec![retransmit_socket], serve_repair, + ancestor_hashes_requests, }, } } @@ -2917,6 +2923,8 @@ impl Node { let (_, broadcast) = multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind"); + let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range); + let info = ContactInfo { id: *pubkey, gossip: SocketAddr::new(gossip_addr.ip(), gossip_port), @@ -2948,6 +2956,7 @@ impl Node { retransmit_sockets, serve_repair, ip_echo: Some(ip_echo), + ancestor_hashes_requests, }, } } @@ -3490,7 +3499,10 @@ mod tests { fn new_with_external_ip_test_gossip() { // Can't use VALIDATOR_PORT_RANGE because if this test runs in parallel with others, the // port returned by `bind_in_range()` might be snatched up before `Node::new_with_external_ip()` runs - let port_range = (VALIDATOR_PORT_RANGE.1 + 10, VALIDATOR_PORT_RANGE.1 + 20); + let port_range = ( + VALIDATOR_PORT_RANGE.1 + MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, + VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH), + ); let ip = IpAddr::V4(Ipv4Addr::from(0)); let port = bind_in_range(ip, port_range).expect("Failed to bind").0;