diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 37326d60a..6469a03db 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1362,11 +1362,39 @@ impl Node { Self::new_localhost_with_pubkey(&pubkey) } pub fn new_localhost_replicator(pubkey: &Pubkey) -> Self { - let mut new = Self::new_localhost_with_pubkey(pubkey); + let gossip = UdpSocket::bind("127.0.0.1:0").unwrap(); + let tvu = UdpSocket::bind("127.0.0.1:0").unwrap(); let storage = UdpSocket::bind("127.0.0.1:0").unwrap(); - new.info.storage_addr = storage.local_addr().unwrap(); - new.sockets.storage = Some(storage); - new + let empty = "0.0.0.0:0".parse().unwrap(); + let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); + + let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + let info = ContactInfo::new( + pubkey, + gossip.local_addr().unwrap(), + tvu.local_addr().unwrap(), + empty, + empty, + storage.local_addr().unwrap(), + empty, + empty, + timestamp(), + ); + + Node { + info, + sockets: Sockets { + gossip, + tvu: vec![tvu], + tpu: vec![], + tpu_via_blobs: vec![], + broadcast, + repair, + retransmit, + storage: Some(storage), + }, + } } pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self { let tpu = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -1408,12 +1436,8 @@ impl Node { }, } } - pub fn new_with_external_ip(pubkey: &Pubkey, gossip_addr: &SocketAddr) -> Node { - fn bind() -> (u16, UdpSocket) { - bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind") - }; - - let (gossip_port, gossip) = if gossip_addr.port() != 0 { + fn get_gossip_port(gossip_addr: &SocketAddr) -> (u16, UdpSocket) { + if gossip_addr.port() != 0 { ( gossip_addr.port(), bind_to(gossip_addr.port(), false).unwrap_or_else(|e| { @@ -1421,8 +1445,14 @@ impl Node { }), ) } else { - bind() - }; + Self::bind() + } + } + fn bind() -> (u16, UdpSocket) { + bind_in_range(FULLNODE_PORT_RANGE).expect("Failed to bind") + } + pub fn new_with_external_ip(pubkey: &Pubkey, gossip_addr: &SocketAddr) -> Node { + let (gossip_port, gossip) = Self::get_gossip_port(gossip_addr); let (tvu_port, tvu_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tvu multi_bind"); @@ -1433,9 +1463,9 @@ impl Node { let (tpu_via_blobs_port, tpu_via_blobs_sockets) = multi_bind_in_range(FULLNODE_PORT_RANGE, 8).expect("tpu multi_bind"); - let (_, repair) = bind(); - let (_, broadcast) = bind(); - let (_, retransmit) = bind(); + let (_, repair) = Self::bind(); + let (_, broadcast) = Self::bind(); + let (_, retransmit) = Self::bind(); let info = ContactInfo::new( pubkey, @@ -1464,6 +1494,21 @@ impl Node { }, } } + pub fn new_replicator_with_external_ip(pubkey: &Pubkey, gossip_addr: &SocketAddr) -> Node { + let mut new = Self::new_with_external_ip(pubkey, gossip_addr); + let (storage_port, storage_socket) = Self::bind(); + + new.info.storage_addr = SocketAddr::new(gossip_addr.ip(), storage_port); + new.sockets.storage = Some(storage_socket); + + let empty = "0.0.0.0:0".parse().unwrap(); + new.info.tpu = empty; + new.info.tpu_via_blobs = empty; + new.sockets.tpu = vec![]; + new.sockets.tpu_via_blobs = vec![]; + + new + } } fn report_time_spent(label: &str, time: &Duration, extra: &str) { @@ -1758,6 +1803,19 @@ mod tests { assert_eq!(node.sockets.gossip.local_addr().unwrap().port(), 8050); } + #[test] + fn new_replicator_external_ip_test() { + let ip = IpAddr::V4(Ipv4Addr::from(0)); + let node = + Node::new_replicator_with_external_ip(&Keypair::new().pubkey(), &socketaddr!(0, 8050)); + + check_socket(&node.sockets.storage.unwrap(), ip, FULLNODE_PORT_RANGE); + check_socket(&node.sockets.gossip, ip, FULLNODE_PORT_RANGE); + check_socket(&node.sockets.repair, ip, FULLNODE_PORT_RANGE); + + check_sockets(&node.sockets.tvu, ip, FULLNODE_PORT_RANGE); + } + //test that all cluster_info objects only generate signed messages //when constructed with keypairs #[test] diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 6e64be1f0..9d80fad6c 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -41,7 +41,7 @@ use std::sync::{Arc, RwLock}; use std::thread::sleep; use std::thread::spawn; use std::thread::JoinHandle; -use std::time::{Duration, Instant}; +use std::time::Duration; #[derive(Serialize, Deserialize)] pub enum ReplicatorRequest { @@ -179,20 +179,13 @@ impl Replicator { #[allow(clippy::new_ret_no_self)] pub fn new( ledger_path: &str, - mut node: Node, + node: Node, cluster_entrypoint: ContactInfo, keypair: Arc, _timeout: Option, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); - // replicator cannot give information on rpc and - // cannot be leader so tpu/rpc ports are cleared - node.info.rpc = "0.0.0.0:0".parse().unwrap(); - node.info.rpc_pubsub = "0.0.0.0:0".parse().unwrap(); - node.info.tpu = "0.0.0.0:0".parse().unwrap(); - node.info.tpu_via_blobs = "0.0.0.0:0".parse().unwrap(); - info!("Replicator: id: {}", keypair.pubkey()); info!("Creating cluster info...."); let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); @@ -316,20 +309,17 @@ impl Replicator { cluster_info: &Arc>, ) { info!("window created, waiting for ledger download done"); - let _start = Instant::now(); let mut _received_so_far = 0; let mut current_slot = start_slot; - let mut done = false; - loop { - loop { - if let Ok(meta) = blocktree.meta(current_slot) { - if let Some(meta) = meta { - if meta.is_rooted { - current_slot += 1; - info!("current slot: {}", current_slot); - } else { - break; + 'outer: loop { + while let Ok(meta) = blocktree.meta(current_slot) { + if let Some(meta) = meta { + if meta.is_rooted { + current_slot += 1; + warn!("current slot: {}", current_slot); + if current_slot >= start_slot + ENTRIES_PER_SEGMENT { + break 'outer; } } else { break; @@ -337,17 +327,7 @@ impl Replicator { } else { break; } - if current_slot >= start_slot + ENTRIES_PER_SEGMENT { - info!("current slot: {} start: {}", current_slot, start_slot); - done = true; - break; - } } - - if done { - break; - } - if exit.load(Ordering::Relaxed) { break; } diff --git a/replicator/src/main.rs b/replicator/src/main.rs index 7b1d9f184..ca1416b83 100644 --- a/replicator/src/main.rs +++ b/replicator/src/main.rs @@ -67,7 +67,7 @@ fn main() { } addr }; - let node = Node::new_with_external_ip(&keypair.pubkey(), &gossip_addr); + let node = Node::new_replicator_with_external_ip(&keypair.pubkey(), &gossip_addr); println!( "replicating the data with keypair={:?} gossip_addr={:?}",