Ensure AncestorHashesSerice selects an open port (#21919)

This commit is contained in:
carllin 2021-12-18 00:44:01 -05:00 committed by GitHub
parent 97a1fa10a6
commit 7f6fb6937a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 4 deletions

View File

@ -201,6 +201,7 @@ impl RepairService {
blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
@ -225,11 +226,10 @@ impl RepairService {
.unwrap()
};
let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let ancestor_hashes_service = AncestorHashesService::new(
exit,
blockstore,
ancestor_hashes_request_socket,
ancestor_hashes_socket,
repair_info,
ancestor_hashes_replay_update_receiver,
);

View File

@ -433,6 +433,7 @@ impl RetransmitStage {
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<PacketBatch>>,
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
@ -486,6 +487,7 @@ impl RetransmitStage {
verified_receiver,
retransmit_sender,
repair_socket,
ancestor_hashes_socket,
exit,
repair_info,
leader_schedule_cache,

View File

@ -82,6 +82,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit: Vec<UdpSocket>,
pub forwards: Vec<UdpSocket>,
pub ancestor_hashes_requests: UdpSocket,
}
#[derive(Default)]
@ -148,11 +149,13 @@ impl Tvu {
fetch: fetch_sockets,
retransmit: retransmit_sockets,
forwards: tvu_forward_sockets,
ancestor_hashes_requests: ancestor_hashes_socket,
} = sockets;
let (fetch_sender, fetch_receiver) = channel();
let repair_socket = Arc::new(repair_socket);
let ancestor_hashes_socket = Arc::new(ancestor_hashes_socket);
let fetch_sockets: Vec<Arc<UdpSocket>> = fetch_sockets.into_iter().map(Arc::new).collect();
let forward_sockets: Vec<Arc<UdpSocket>> =
tvu_forward_sockets.into_iter().map(Arc::new).collect();
@ -187,6 +190,7 @@ impl Tvu {
cluster_info.clone(),
Arc::new(retransmit_sockets),
repair_socket,
ancestor_hashes_socket,
verified_receiver,
exit.clone(),
cluster_slots_update_receiver,
@ -461,6 +465,7 @@ pub mod tests {
retransmit: target1.sockets.retransmit_sockets,
fetch: target1.sockets.tvu,
forwards: target1.sockets.tvu_forwards,
ancestor_hashes_requests: target1.sockets.ancestor_hashes_requests,
}
},
blockstore,

View File

@ -836,6 +836,11 @@ impl Validator {
.iter()
.map(|s| s.try_clone().expect("Failed to clone TVU forwards Sockets"))
.collect(),
ancestor_hashes_requests: node
.sockets
.ancestor_hashes_requests
.try_clone()
.expect("Failed to clone ancestor_hashes_requests socket"),
},
blockstore.clone(),
ledger_signal_receiver,

View File

@ -458,6 +458,7 @@ impl WindowService {
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
retransmit_sender: Sender<Vec<Shred>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
leader_schedule_cache: Arc<LeaderScheduleCache>,
@ -483,6 +484,7 @@ impl WindowService {
blockstore.clone(),
exit.clone(),
repair_socket,
ancestor_hashes_socket,
repair_info,
verified_vote_receiver,
outstanding_requests.clone(),

View File

@ -94,7 +94,7 @@ use {
};
pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 10; // VALIDATOR_PORT_RANGE must be at least this wide
pub const MINIMUM_VALIDATOR_PORT_RANGE_WIDTH: u16 = 11; // VALIDATOR_PORT_RANGE must be at least this wide
/// The Data plane fanout size, also used as the neighborhood size
pub const DATA_PLANE_FANOUT: usize = 200;
@ -2742,6 +2742,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
pub serve_repair: UdpSocket,
pub ancestor_hashes_requests: UdpSocket,
}
#[derive(Debug)]
@ -2775,6 +2776,8 @@ impl Node {
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let ancestor_hashes_requests = UdpSocket::bind("0.0.0.0:0").unwrap();
let info = ContactInfo {
id: *pubkey,
gossip: gossip_addr,
@ -2804,6 +2807,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
ancestor_hashes_requests,
},
}
}
@ -2845,6 +2849,7 @@ impl Node {
let (repair_port, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
@ -2880,6 +2885,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
ancestor_hashes_requests,
},
}
}
@ -2917,6 +2923,8 @@ impl Node {
let (_, broadcast) =
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
let info = ContactInfo {
id: *pubkey,
gossip: SocketAddr::new(gossip_addr.ip(), gossip_port),
@ -2948,6 +2956,7 @@ impl Node {
retransmit_sockets,
serve_repair,
ip_echo: Some(ip_echo),
ancestor_hashes_requests,
},
}
}
@ -3490,7 +3499,10 @@ mod tests {
fn new_with_external_ip_test_gossip() {
// Can't use VALIDATOR_PORT_RANGE because if this test runs in parallel with others, the
// port returned by `bind_in_range()` might be snatched up before `Node::new_with_external_ip()` runs
let port_range = (VALIDATOR_PORT_RANGE.1 + 10, VALIDATOR_PORT_RANGE.1 + 20);
let port_range = (
VALIDATOR_PORT_RANGE.1 + MINIMUM_VALIDATOR_PORT_RANGE_WIDTH,
VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH),
);
let ip = IpAddr::V4(Ipv4Addr::from(0));
let port = bind_in_range(ip, port_range).expect("Failed to bind").0;