diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index cebff05ad..3d12b8f0f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1507,6 +1507,7 @@ impl ClusterInfo { daddr, daddr, daddr, + daddr, timestamp(), ); (node, gossip_socket, Some(ip_echo)) @@ -1527,6 +1528,7 @@ impl ClusterInfo { daddr, daddr, daddr, + daddr, timestamp(), ); (node, gossip_socket, None) @@ -1610,6 +1612,7 @@ impl Node { gossip.local_addr().unwrap(), tvu.local_addr().unwrap(), tvu_forwards.local_addr().unwrap(), + repair.local_addr().unwrap(), empty, empty, storage.local_addr().unwrap(), @@ -1658,6 +1661,7 @@ impl Node { tvu_forwards.local_addr().unwrap(), tpu.local_addr().unwrap(), tpu_forwards.local_addr().unwrap(), + repair.local_addr().unwrap(), storage.local_addr().unwrap(), rpc_addr, rpc_pubsub_addr, @@ -1717,7 +1721,7 @@ impl Node { let (_, retransmit_sockets) = multi_bind_in_range(port_range, 8).expect("retransmit multi_bind"); - let (_, repair) = Self::bind(port_range); + let (repair_port, repair) = Self::bind(port_range); let (_, broadcast) = Self::bind(port_range); let info = ContactInfo::new( @@ -1725,6 +1729,7 @@ impl Node { SocketAddr::new(gossip_addr.ip(), gossip_port), SocketAddr::new(gossip_addr.ip(), tvu_port), SocketAddr::new(gossip_addr.ip(), tvu_forwards_port), + SocketAddr::new(gossip_addr.ip(), repair_port), SocketAddr::new(gossip_addr.ip(), tpu_port), SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), socketaddr_any!(), @@ -1882,6 +1887,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1239), socketaddr!([127, 0, 0, 1], 1240), socketaddr!([127, 0, 0, 1], 1241), + socketaddr!([127, 0, 0, 1], 1242), 0, ); cluster_info.insert_info(nxt.clone()); @@ -1902,6 +1908,7 @@ mod tests { socketaddr!([127, 0, 0, 1], 1239), socketaddr!([127, 0, 0, 1], 1240), socketaddr!([127, 0, 0, 1], 1241), + socketaddr!([127, 0, 0, 1], 1242), 0, ); cluster_info.insert_info(nxt); @@ -1939,6 +1946,7 @@ mod tests { socketaddr!("127.0.0.1:1239"), socketaddr!("127.0.0.1:1240"), socketaddr!("127.0.0.1:1241"), + socketaddr!("127.0.0.1:1242"), 0, ); let rv = ClusterInfo::run_window_request( diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 60507b276..2001580d4 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -210,13 +210,13 @@ impl ClusterInfoRepairListener { for (repairee_pubkey, repairee_epoch_slots) in repairees { let repairee_root = repairee_epoch_slots.root; - let repairee_tvu = { + let repairee_repair_addr = { let r_cluster_info = cluster_info.read().unwrap(); let contact_info = r_cluster_info.get_contact_info_for_node(repairee_pubkey); - contact_info.map(|c| c.tvu) + contact_info.map(|c| c.repair) }; - if let Some(repairee_tvu) = repairee_tvu { + if let Some(repairee_addr) = repairee_repair_addr { // For every repairee, get the set of repairmen who are responsible for let mut eligible_repairmen = Self::find_eligible_repairmen( my_pubkey, @@ -242,7 +242,7 @@ impl ClusterInfoRepairListener { &repairee_epoch_slots, &eligible_repairmen, socket, - &repairee_tvu, + &repairee_addr, NUM_SLOTS_PER_UPDATE, epoch_schedule, ); @@ -261,7 +261,7 @@ impl ClusterInfoRepairListener { repairee_epoch_slots: &EpochSlots, eligible_repairmen: &[&Pubkey], socket: &UdpSocket, - repairee_tvu: &SocketAddr, + repairee_addr: &SocketAddr, num_slots_to_repair: usize, epoch_schedule: &EpochSchedule, ) -> Result<()> { @@ -320,7 +320,7 @@ impl ClusterInfoRepairListener { .get_data_shred(slot, blob_index as u64) .expect("Failed to read data blob from blocktree") { - socket.send_to(&blob_data[..], repairee_tvu)?; + socket.send_to(&blob_data[..], repairee_addr)?; total_data_blobs_sent += 1; } @@ -328,7 +328,7 @@ impl ClusterInfoRepairListener { .get_coding_shred(slot, blob_index as u64) .expect("Failed to read coding blob from blocktree") { - socket.send_to(&coding_bytes[..], repairee_tvu)?; + socket.send_to(&coding_bytes[..], repairee_addr)?; total_coding_blobs_sent += 1; } } diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index 1d0ca64f4..b2895825d 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -20,8 +20,10 @@ pub struct ContactInfo { pub gossip: SocketAddr, /// address to connect to for replication pub tvu: SocketAddr, - /// address to forward blobs to + /// address to forward shreds to pub tvu_forwards: SocketAddr, + /// address to send repairs to + pub repair: SocketAddr, /// transactions address pub tpu: SocketAddr, /// address to forward unprocessed transactions to @@ -80,6 +82,7 @@ impl Default for ContactInfo { gossip: socketaddr_any!(), tvu: socketaddr_any!(), tvu_forwards: socketaddr_any!(), + repair: socketaddr_any!(), tpu: socketaddr_any!(), tpu_forwards: socketaddr_any!(), storage_addr: socketaddr_any!(), @@ -98,6 +101,7 @@ impl ContactInfo { gossip: SocketAddr, tvu: SocketAddr, tvu_forwards: SocketAddr, + repair: SocketAddr, tpu: SocketAddr, tpu_forwards: SocketAddr, storage_addr: SocketAddr, @@ -111,6 +115,7 @@ impl ContactInfo { gossip, tvu, tvu_forwards, + repair, tpu, tpu_forwards, storage_addr, @@ -131,6 +136,7 @@ impl ContactInfo { socketaddr!("127.0.0.1:1239"), socketaddr!("127.0.0.1:1240"), socketaddr!("127.0.0.1:1241"), + socketaddr!("127.0.0.1:1242"), now, ) } @@ -150,6 +156,7 @@ impl ContactInfo { addr, addr, addr, + addr, 0, ) } @@ -167,6 +174,7 @@ impl ContactInfo { let tvu_addr = next_port(&bind_addr, 2); let tpu_forwards_addr = next_port(&bind_addr, 3); let tvu_forwards_addr = next_port(&bind_addr, 4); + let repair = next_port(&bind_addr, 5); let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); Self::new( @@ -174,6 +182,7 @@ impl ContactInfo { gossip_addr, tvu_addr, tvu_forwards_addr, + repair, tpu_addr, tpu_forwards_addr, "0.0.0.0:0".parse().unwrap(), @@ -202,6 +211,7 @@ impl ContactInfo { daddr, daddr, daddr, + daddr, timestamp(), ) } @@ -245,6 +255,7 @@ impl Signable for ContactInfo { tvu: SocketAddr, tpu: SocketAddr, tpu_forwards: SocketAddr, + repair: SocketAddr, storage_addr: SocketAddr, rpc: SocketAddr, rpc_pubsub: SocketAddr, @@ -259,6 +270,7 @@ impl Signable for ContactInfo { tpu: me.tpu, storage_addr: me.storage_addr, tpu_forwards: me.tpu_forwards, + repair: me.repair, rpc: me.rpc, rpc_pubsub: me.rpc_pubsub, wallclock: me.wallclock,