diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index 736907006..39fb3289b 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -16,6 +16,7 @@ use solana_core::{ packet::{limited_deserialize, PACKET_DATA_SIZE}, repair_service, repair_service::{RepairService, RepairSlotRange, RepairStrategy}, + serve_repair::ServeRepair, shred_fetch_stage::ShredFetchStage, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, storage_stage::NUM_STORAGE_SAMPLES, @@ -195,13 +196,7 @@ impl Archiver { Blockstore::open(ledger_path).expect("Expected to be able to open database ledger"), ); - let gossip_service = GossipService::new( - &cluster_info, - Some(blockstore.clone()), - None, - node.sockets.gossip, - &exit, - ); + let gossip_service = GossipService::new(&cluster_info, None, node.sockets.gossip, &exit); info!("Connecting to the cluster via {:?}", cluster_entrypoint); let (nodes, _) = @@ -814,7 +809,7 @@ impl Archiver { /// It is recommended to use a temporary blockstore for this since the download will not verify /// shreds received and might impact the chaining of shreds across slots pub fn download_from_archiver( - cluster_info: &Arc>, + serve_repair: &ServeRepair, archiver_info: &ContactInfo, blockstore: &Arc, slots_per_segment: u64, @@ -834,10 +829,10 @@ impl Archiver { Recycler::default(), "archiver_reeciver", ); - let id = cluster_info.read().unwrap().id(); + let id = serve_repair.keypair().pubkey(); info!( "Sending repair requests from: {} to: {}", - cluster_info.read().unwrap().my_data().id, + serve_repair.my_info().id, archiver_info.gossip ); let repair_slot_range = RepairSlotRange { @@ -857,9 +852,7 @@ impl Archiver { let reqs: Vec<_> = repairs .into_iter() .filter_map(|repair_request| { - cluster_info - .read() - .unwrap() + serve_repair .map_repair_request(&repair_request) .map(|result| ((archiver_info.gossip, result), repair_request)) .ok() diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index afa627f16..411ce83fa 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -21,7 +21,6 @@ use crate::{ crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote}, packet::{Packet, PACKET_DATA_SIZE}, - repair_service::RepairType, result::{Error, Result}, sendmmsg::{multicast, send_mmsg}, weighted_shuffle::{weighted_best, weighted_shuffle}, @@ -29,8 +28,7 @@ use crate::{ use bincode::{serialize, serialized_size}; use core::cmp; use itertools::Itertools; -use rand::{thread_rng, Rng}; -use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore, staking_utils}; +use solana_ledger::{bank_forks::BankForks, staking_utils}; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error}; use solana_net_utils::{ @@ -63,15 +61,12 @@ pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000); pub const DATA_PLANE_FANOUT: usize = 200; /// milliseconds we sleep for between gossip requests pub const GOSSIP_SLEEP_MILLIS: u64 = 100; - -/// the number of slots to respond with when responding to `Orphan` requests -pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; /// The maximum size of a bloom filter -pub const MAX_BLOOM_SIZE: usize = 1028; +pub const MAX_BLOOM_SIZE: usize = 1018; /// The maximum size of a protocol payload const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE; /// The largest protocol header size -const MAX_PROTOCOL_HEADER_SIZE: u64 = 204; +const MAX_PROTOCOL_HEADER_SIZE: u64 = 214; #[derive(Debug, PartialEq, Eq)] pub enum ClusterInfoError { @@ -174,12 +169,6 @@ enum Protocol { PullResponse(Pubkey, Vec), PushMessage(Pubkey, Vec), PruneMessage(Pubkey, PruneData), - - /// Window protocol messages - /// TODO: move this message to a different module - RequestWindowIndex(ContactInfo, u64, u64), - RequestHighestWindowIndex(ContactInfo, u64, u64), - RequestOrphan(ContactInfo, u64), } impl ClusterInfo { @@ -525,7 +514,7 @@ impl ClusterInfo { } /// all tvu peers with valid gossip addrs that likely have the slot being requested - fn repair_peers(&self, slot: Slot) -> Vec { + pub fn repair_peers(&self, slot: Slot) -> Vec { let me = self.my_data(); ClusterInfo::tvu_peers(self) .into_iter() @@ -866,61 +855,6 @@ impl ClusterInfo { Ok(()) } - pub fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { - let req = Protocol::RequestWindowIndex(self.my_data(), slot, shred_index); - let out = serialize(&req)?; - Ok(out) - } - - fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { - let req = Protocol::RequestHighestWindowIndex(self.my_data(), slot, shred_index); - let out = serialize(&req)?; - Ok(out) - } - - fn orphan_bytes(&self, slot: Slot) -> Result> { - let req = Protocol::RequestOrphan(self.my_data(), slot); - let out = serialize(&req)?; - Ok(out) - } - - pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec)> { - // find a peer that appears to be accepting replication and has the desired slot, as indicated - // by a valid tvu port location - let valid: Vec<_> = self.repair_peers(repair_request.slot()); - if valid.is_empty() { - return Err(ClusterInfoError::NoPeers.into()); - } - let n = thread_rng().gen::() % valid.len(); - let addr = valid[n].gossip; // send the request to the peer's gossip port - let out = self.map_repair_request(repair_request)?; - - Ok((addr, out)) - } - pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { - match repair_request { - RepairType::Shred(slot, shred_index) => { - datapoint_debug!( - "cluster_info-repair", - ("repair-slot", *slot, i64), - ("repair-ix", *shred_index, i64) - ); - Ok(self.window_index_request_bytes(*slot, *shred_index)?) - } - RepairType::HighestShred(slot, shred_index) => { - datapoint_debug!( - "cluster_info-repair_highest", - ("repair-highest-slot", *slot, i64), - ("repair-highest-ix", *shred_index, i64) - ); - Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) - } - RepairType::Orphan(slot) => { - datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64)); - Ok(self.orphan_bytes(*slot)?) - } - } - } // If the network entrypoint hasn't been discovered yet, add it to the crds table fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) { let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint { @@ -1173,117 +1107,9 @@ impl ClusterInfo { .unwrap() } - fn get_data_shred_as_packet( - blockstore: &Arc, - slot: Slot, - shred_index: u64, - dest: &SocketAddr, - ) -> Result> { - let data = blockstore.get_data_shred(slot, shred_index)?; - Ok(data.map(|data| { - let mut packet = Packet::default(); - packet.meta.size = data.len(); - packet.meta.set_addr(dest); - packet.data.copy_from_slice(&data); - packet - })) - } - - fn run_window_request( - recycler: &PacketsRecycler, - from: &ContactInfo, - from_addr: &SocketAddr, - blockstore: Option<&Arc>, - me: &ContactInfo, - slot: Slot, - shred_index: u64, - ) -> Option { - if let Some(blockstore) = blockstore { - // Try to find the requested index in one of the slots - let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr); - - if let Ok(Some(packet)) = packet { - inc_new_counter_debug!("cluster_info-window-request-ledger", 1); - return Some(Packets::new_with_recycler_data( - recycler, - "run_window_request", - vec![packet], - )); - } - } - - inc_new_counter_debug!("cluster_info-window-request-fail", 1); - trace!( - "{}: failed RequestWindowIndex {} {} {}", - me.id, - from.id, - slot, - shred_index, - ); - - None - } - - fn run_highest_window_request( - recycler: &PacketsRecycler, - from_addr: &SocketAddr, - blockstore: Option<&Arc>, - slot: Slot, - highest_index: u64, - ) -> Option { - let blockstore = blockstore?; - // Try to find the requested index in one of the slots - let meta = blockstore.meta(slot).ok()??; - if meta.received > highest_index { - // meta.received must be at least 1 by this point - let packet = - Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr) - .ok()??; - return Some(Packets::new_with_recycler_data( - recycler, - "run_highest_window_request", - vec![packet], - )); - } - None - } - - fn run_orphan( - recycler: &PacketsRecycler, - from_addr: &SocketAddr, - blockstore: Option<&Arc>, - mut slot: Slot, - max_responses: usize, - ) -> Option { - let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); - if let Some(blockstore) = blockstore { - // Try to find the next "n" parent slots of the input slot - while let Ok(Some(meta)) = blockstore.meta(slot) { - if meta.received == 0 { - break; - } - let packet = - Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr); - if let Ok(Some(packet)) = packet { - res.packets.push(packet); - } - if meta.is_parent_set() && res.packets.len() <= max_responses { - slot = meta.parent_slot; - } else { - break; - } - } - } - if res.is_empty() { - return None; - } - Some(res) - } - fn handle_packets( me: &Arc>, recycler: &PacketsRecycler, - blockstore: Option<&Arc>, stakes: &HashMap, packets: Packets, response_sender: &PacketSender, @@ -1390,13 +1216,6 @@ impl ClusterInfo { ("prune_message", (allocated.get() - start) as i64, i64), ); } - _ => { - let rsp = - Self::handle_repair(me, recycler, &from_addr, blockstore, request); - if let Some(rsp) = rsp { - let _ignore_disconnect = response_sender.send(rsp); - } - } }) }); // process the collected pulls together @@ -1524,104 +1343,10 @@ impl ClusterInfo { } } - fn get_repair_sender(request: &Protocol) -> &ContactInfo { - match request { - Protocol::RequestWindowIndex(ref from, _, _) => from, - Protocol::RequestHighestWindowIndex(ref from, _, _) => from, - Protocol::RequestOrphan(ref from, _) => from, - _ => panic!("Not a repair request"), - } - } - - fn handle_repair( - me: &Arc>, - recycler: &PacketsRecycler, - from_addr: &SocketAddr, - blockstore: Option<&Arc>, - request: Protocol, - ) -> Option { - let now = Instant::now(); - - //TODO this doesn't depend on cluster_info module, could be moved - //but we are using the listen thread to service these request - //TODO verify from is signed - - let self_id = me.read().unwrap().gossip.id; - let from = Self::get_repair_sender(&request); - if from.id == me.read().unwrap().gossip.id { - warn!( - "{}: Ignored received repair request from ME {}", - self_id, from.id, - ); - inc_new_counter_debug!("cluster_info-handle-repair--eq", 1); - return None; - } - - me.write() - .unwrap() - .gossip - .crds - .update_record_timestamp(&from.id, timestamp()); - let my_info = me.read().unwrap().my_data(); - - let (res, label) = { - match &request { - Protocol::RequestWindowIndex(from, slot, shred_index) => { - inc_new_counter_debug!("cluster_info-request-window-index", 1); - ( - Self::run_window_request( - recycler, - from, - &from_addr, - blockstore, - &my_info, - *slot, - *shred_index, - ), - "RequestWindowIndex", - ) - } - - Protocol::RequestHighestWindowIndex(_, slot, highest_index) => { - inc_new_counter_debug!("cluster_info-request-highest-window-index", 1); - ( - Self::run_highest_window_request( - recycler, - &from_addr, - blockstore, - *slot, - *highest_index, - ), - "RequestHighestWindowIndex", - ) - } - Protocol::RequestOrphan(_, slot) => { - inc_new_counter_debug!("cluster_info-request-orphan", 1); - ( - Self::run_orphan( - recycler, - &from_addr, - blockstore, - *slot, - MAX_ORPHAN_REPAIR_RESPONSES, - ), - "RequestOrphan", - ) - } - _ => panic!("Not a repair request"), - } - }; - - trace!("{}: received repair request: {:?}", self_id, request); - report_time_spent(label, &now.elapsed(), ""); - res - } - /// Process messages from the network fn run_listen( obj: &Arc>, recycler: &PacketsRecycler, - blockstore: Option<&Arc>, bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, @@ -1636,12 +1361,11 @@ impl ClusterInfo { None => HashMap::new(), }; - Self::handle_packets(obj, &recycler, blockstore, &stakes, reqs, response_sender); + Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender); Ok(()) } pub fn listen( me: Arc>, - blockstore: Option>, bank_forks: Option>>, requests_receiver: PacketReceiver, response_sender: PacketSender, @@ -1655,7 +1379,6 @@ impl ClusterInfo { let e = Self::run_listen( &me, &recycler, - blockstore.as_ref(), bank_forks.as_ref(), &requests_receiver, &response_sender, @@ -1690,6 +1413,7 @@ impl ClusterInfo { dummy_addr, dummy_addr, dummy_addr, + dummy_addr, timestamp(), ) } @@ -1770,6 +1494,7 @@ pub struct Sockets { pub repair: UdpSocket, pub retransmit_sockets: Vec, pub storage: Option, + pub serve_repair: UdpSocket, } #[derive(Debug)] @@ -1790,9 +1515,10 @@ impl Node { let storage = UdpSocket::bind("127.0.0.1:0").unwrap(); let empty = "0.0.0.0:0".parse().unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); - let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); + let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); + let info = ContactInfo::new( pubkey, gossip.local_addr().unwrap(), @@ -1804,6 +1530,7 @@ impl Node { storage.local_addr().unwrap(), empty, empty, + serve_repair.local_addr().unwrap(), timestamp(), ); @@ -1818,6 +1545,7 @@ impl Node { broadcast, repair, retransmit_sockets: vec![retransmit], + serve_repair, storage: Some(storage), ip_echo: None, }, @@ -1840,6 +1568,7 @@ impl Node { let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); + let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap(); let info = ContactInfo::new( pubkey, gossip_addr, @@ -1851,6 +1580,7 @@ impl Node { storage.local_addr().unwrap(), rpc_addr, rpc_pubsub_addr, + serve_repair.local_addr().unwrap(), timestamp(), ); Node { @@ -1866,6 +1596,7 @@ impl Node { repair, retransmit_sockets: vec![retransmit_socket], storage: None, + serve_repair, }, } } @@ -1908,6 +1639,8 @@ impl Node { multi_bind_in_range(port_range, 8).expect("retransmit multi_bind"); let (repair_port, repair) = Self::bind(port_range); + let (serve_repair_port, serve_repair) = Self::bind(port_range); + let (_, broadcast) = multi_bind_in_range(port_range, 4).expect("broadcast multi_bind"); let info = ContactInfo::new( @@ -1918,6 +1651,7 @@ impl Node { SocketAddr::new(gossip_addr.ip(), repair_port), SocketAddr::new(gossip_addr.ip(), tpu_port), SocketAddr::new(gossip_addr.ip(), tpu_forwards_port), + SocketAddr::new(gossip_addr.ip(), serve_repair_port), socketaddr_any!(), socketaddr_any!(), socketaddr_any!(), @@ -1937,6 +1671,7 @@ impl Node { repair, retransmit_sockets, storage: None, + serve_repair, ip_echo: Some(ip_echo), }, } @@ -1973,18 +1708,8 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { mod tests { use super::*; use crate::crds_value::CrdsValueLabel; - use crate::repair_service::RepairType; - use crate::result::Error; use rayon::prelude::*; - use solana_ledger::blockstore::make_many_slot_entries; - use solana_ledger::blockstore::Blockstore; - use solana_ledger::blockstore_processor::fill_blockstore_slot_with_ticks; - use solana_ledger::get_tmp_ledger_path; - use solana_ledger::shred::{ - max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, - }; use solana_perf::test_tx::test_tx; - use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr}; @@ -2055,242 +1780,6 @@ mod tests { let label = CrdsValueLabel::ContactInfo(d.id); assert!(cluster_info.gossip.crds.lookup(&label).is_none()); } - #[test] - fn window_index_request() { - let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me); - let rv = cluster_info.repair_request(&RepairType::Shred(0, 0)); - assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); - - let gossip_addr = socketaddr!([127, 0, 0, 1], 1234); - let nxt = ContactInfo::new( - &Pubkey::new_rand(), - gossip_addr, - socketaddr!([127, 0, 0, 1], 1235), - socketaddr!([127, 0, 0, 1], 1236), - socketaddr!([127, 0, 0, 1], 1237), - socketaddr!([127, 0, 0, 1], 1238), - socketaddr!([127, 0, 0, 1], 1239), - socketaddr!([127, 0, 0, 1], 1240), - socketaddr!([127, 0, 0, 1], 1241), - socketaddr!([127, 0, 0, 1], 1242), - 0, - ); - cluster_info.insert_info(nxt.clone()); - let rv = cluster_info - .repair_request(&RepairType::Shred(0, 0)) - .unwrap(); - assert_eq!(nxt.gossip, gossip_addr); - assert_eq!(rv.0, nxt.gossip); - - let gossip_addr2 = socketaddr!([127, 0, 0, 2], 1234); - let nxt = ContactInfo::new( - &Pubkey::new_rand(), - gossip_addr2, - socketaddr!([127, 0, 0, 1], 1235), - socketaddr!([127, 0, 0, 1], 1236), - socketaddr!([127, 0, 0, 1], 1237), - socketaddr!([127, 0, 0, 1], 1238), - socketaddr!([127, 0, 0, 1], 1239), - socketaddr!([127, 0, 0, 1], 1240), - socketaddr!([127, 0, 0, 1], 1241), - socketaddr!([127, 0, 0, 1], 1242), - 0, - ); - cluster_info.insert_info(nxt); - let mut one = false; - let mut two = false; - while !one || !two { - //this randomly picks an option, so eventually it should pick both - let rv = cluster_info - .repair_request(&RepairType::Shred(0, 0)) - .unwrap(); - if rv.0 == gossip_addr { - one = true; - } - if rv.0 == gossip_addr2 { - two = true; - } - } - assert!(one && two); - } - - /// test window requests respond with the right shred, and do not overrun - #[test] - fn run_window_request() { - let recycler = PacketsRecycler::default(); - solana_logger::setup(); - let ledger_path = get_tmp_ledger_path!(); - { - let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let me = ContactInfo::new( - &Pubkey::new_rand(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - socketaddr!("127.0.0.1:1238"), - socketaddr!("127.0.0.1:1239"), - socketaddr!("127.0.0.1:1240"), - socketaddr!("127.0.0.1:1241"), - socketaddr!("127.0.0.1:1242"), - 0, - ); - let rv = ClusterInfo::run_window_request( - &recycler, - &me, - &socketaddr_any!(), - Some(&blockstore), - &me, - 0, - 0, - ); - assert!(rv.is_none()); - let mut common_header = ShredCommonHeader::default(); - common_header.slot = 2; - common_header.index = 1; - let mut data_header = DataShredHeader::default(); - data_header.parent_offset = 1; - let shred_info = Shred::new_empty_from_header( - common_header, - data_header, - CodingShredHeader::default(), - ); - - blockstore - .insert_shreds(vec![shred_info], None, false) - .expect("Expect successful ledger write"); - - let rv = ClusterInfo::run_window_request( - &recycler, - &me, - &socketaddr_any!(), - Some(&blockstore), - &me, - 2, - 1, - ); - assert!(!rv.is_none()); - let rv: Vec = rv - .expect("packets") - .packets - .into_iter() - .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) - .collect(); - assert_eq!(rv[0].index(), 1); - assert_eq!(rv[0].slot(), 2); - } - - Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); - } - - /// test run_window_requestwindow requests respond with the right shred, and do not overrun - #[test] - fn run_highest_window_request() { - let recycler = PacketsRecycler::default(); - solana_logger::setup(); - let ledger_path = get_tmp_ledger_path!(); - { - let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rv = ClusterInfo::run_highest_window_request( - &recycler, - &socketaddr_any!(), - Some(&blockstore), - 0, - 0, - ); - assert!(rv.is_none()); - - let _ = fill_blockstore_slot_with_ticks( - &blockstore, - max_ticks_per_n_shreds(1) + 1, - 2, - 1, - Hash::default(), - ); - - let rv = ClusterInfo::run_highest_window_request( - &recycler, - &socketaddr_any!(), - Some(&blockstore), - 2, - 1, - ); - let rv: Vec = rv - .expect("packets") - .packets - .into_iter() - .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) - .collect(); - assert!(!rv.is_empty()); - let index = blockstore.meta(2).unwrap().unwrap().received - 1; - assert_eq!(rv[0].index(), index as u32); - assert_eq!(rv[0].slot(), 2); - - let rv = ClusterInfo::run_highest_window_request( - &recycler, - &socketaddr_any!(), - Some(&blockstore), - 2, - index + 1, - ); - assert!(rv.is_none()); - } - - Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); - } - - #[test] - fn run_orphan() { - solana_logger::setup(); - let recycler = PacketsRecycler::default(); - let ledger_path = get_tmp_ledger_path!(); - { - let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rv = - ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0); - assert!(rv.is_none()); - - // Create slots 1, 2, 3 with 5 shreds apiece - let (shreds, _) = make_many_slot_entries(1, 3, 5); - - blockstore - .insert_shreds(shreds, None, false) - .expect("Expect successful ledger write"); - - // We don't have slot 4, so we don't know how to service this requeset - let rv = - ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5); - assert!(rv.is_none()); - - // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively - // for this request - let rv: Vec<_> = - ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5) - .expect("run_orphan packets") - .packets - .iter() - .map(|b| b.clone()) - .collect(); - let expected: Vec<_> = (1..=3) - .rev() - .map(|slot| { - let index = blockstore.meta(slot).unwrap().unwrap().received - 1; - ClusterInfo::get_data_shred_as_packet( - &blockstore, - slot, - index, - &socketaddr_any!(), - ) - .unwrap() - .unwrap() - }) - .collect(); - assert_eq!(rv, expected) - } - - Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); - } fn assert_in_range(x: u16, range: (u16, u16)) { assert!(x >= range.0); @@ -2671,13 +2160,16 @@ mod tests { } fn test_split_messages(value: CrdsValue) { - const NUM_VALUES: usize = 30; + const NUM_VALUES: u64 = 30; let value_size = value.size(); - let expected_len = NUM_VALUES / (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize; - let msgs = vec![value; NUM_VALUES]; + let num_values_per_payload = (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1); + + // Expected len is the ceiling of the division + let expected_len = (NUM_VALUES + num_values_per_payload - 1) / num_values_per_payload; + let msgs = vec![value; NUM_VALUES as usize]; let split = ClusterInfo::split_gossip_messages(msgs); - assert!(split.len() <= expected_len); + assert!(split.len() as u64 <= expected_len); } #[test] @@ -2850,25 +2342,6 @@ mod tests { - serialized_size(&PruneData::default()).unwrap(), ); - // make sure repairs are always smaller than the gossip messages - assert!( - max_protocol_size - > serialized_size(&Protocol::RequestWindowIndex(ContactInfo::default(), 0, 0)) - .unwrap() - ); - assert!( - max_protocol_size - > serialized_size(&Protocol::RequestHighestWindowIndex( - ContactInfo::default(), - 0, - 0 - )) - .unwrap() - ); - assert!( - max_protocol_size - > serialized_size(&Protocol::RequestOrphan(ContactInfo::default(), 0)).unwrap() - ); // finally assert the header size estimation is correct assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size); } diff --git a/core/src/contact_info.rs b/core/src/contact_info.rs index feaaf334e..cf0f437b5 100644 --- a/core/src/contact_info.rs +++ b/core/src/contact_info.rs @@ -17,7 +17,7 @@ pub struct ContactInfo { pub tvu: SocketAddr, /// address to forward shreds to pub tvu_forwards: SocketAddr, - /// address to send repairs to + /// address to send repair responses to pub repair: SocketAddr, /// transactions address pub tpu: SocketAddr, @@ -29,6 +29,8 @@ pub struct ContactInfo { pub rpc: SocketAddr, /// websocket for JSON-RPC push notifications pub rpc_pubsub: SocketAddr, + /// address to send repair requests to + pub serve_repair: SocketAddr, /// latest wallclock picked pub wallclock: u64, /// node shred version @@ -85,6 +87,7 @@ impl Default for ContactInfo { storage_addr: socketaddr_any!(), rpc: socketaddr_any!(), rpc_pubsub: socketaddr_any!(), + serve_repair: socketaddr_any!(), wallclock: 0, shred_version: 0, } @@ -104,6 +107,7 @@ impl ContactInfo { storage_addr: SocketAddr, rpc: SocketAddr, rpc_pubsub: SocketAddr, + serve_repair: SocketAddr, now: u64, ) -> Self { Self { @@ -117,6 +121,7 @@ impl ContactInfo { storage_addr, rpc, rpc_pubsub, + serve_repair, wallclock: now, shred_version: 0, } @@ -134,6 +139,7 @@ impl ContactInfo { socketaddr!("127.0.0.1:1240"), socketaddr!("127.0.0.1:1241"), socketaddr!("127.0.0.1:1242"), + socketaddr!("127.0.0.1:1243"), now, ) } @@ -154,6 +160,7 @@ impl ContactInfo { addr, addr, addr, + addr, 0, ) } @@ -174,6 +181,7 @@ impl ContactInfo { let repair = next_port(&bind_addr, 5); let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT); let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); + let serve_repair = next_port(&bind_addr, 6); Self::new( pubkey, gossip_addr, @@ -185,6 +193,7 @@ impl ContactInfo { "0.0.0.0:0".parse().unwrap(), rpc_addr, rpc_pubsub_addr, + serve_repair, timestamp(), ) } @@ -209,6 +218,7 @@ impl ContactInfo { daddr, daddr, daddr, + daddr, timestamp(), ) } @@ -267,6 +277,7 @@ mod tests { assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); assert!(ci.storage_addr.ip().is_unspecified()); + assert!(ci.serve_repair.ip().is_unspecified()); } #[test] fn test_multicast() { @@ -278,6 +289,7 @@ mod tests { assert!(ci.rpc_pubsub.ip().is_multicast()); assert!(ci.tpu.ip().is_multicast()); assert!(ci.storage_addr.ip().is_multicast()); + assert!(ci.serve_repair.ip().is_multicast()); } #[test] fn test_entry_point() { @@ -290,6 +302,7 @@ mod tests { assert!(ci.rpc_pubsub.ip().is_unspecified()); assert!(ci.tpu.ip().is_unspecified()); assert!(ci.storage_addr.ip().is_unspecified()); + assert!(ci.serve_repair.ip().is_unspecified()); } #[test] fn test_socketaddr() { @@ -302,7 +315,9 @@ mod tests { assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT); assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT); assert!(ci.storage_addr.ip().is_unspecified()); + assert_eq!(ci.serve_repair.port(), 16); } + #[test] fn replayed_data_new_with_socketaddr_with_pubkey() { let keypair = Keypair::new(); @@ -323,6 +338,9 @@ mod tests { d1.rpc_pubsub, socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT)) ); + assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238")); + assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239")); + assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240")); } #[test] diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 374fba896..ac9fa3a80 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -6,7 +6,6 @@ use crate::streamer; use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; use solana_ledger::bank_forks::BankForks; -use solana_ledger::blockstore::Blockstore; use solana_perf::recycler::Recycler; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -24,7 +23,6 @@ pub struct GossipService { impl GossipService { pub fn new( cluster_info: &Arc>, - blockstore: Option>, bank_forks: Option>>, gossip_socket: UdpSocket, exit: &Arc, @@ -47,7 +45,6 @@ impl GossipService { let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); let t_listen = ClusterInfo::listen( cluster_info.clone(), - blockstore, bank_forks.clone(), request_receiver, response_sender.clone(), @@ -283,8 +280,7 @@ fn make_gossip_node( cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); } let cluster_info = Arc::new(RwLock::new(cluster_info)); - let gossip_service = - GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit); + let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit); (gossip_service, ip_echo, cluster_info) } @@ -303,7 +299,7 @@ mod tests { let tn = Node::new_localhost(); let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); let c = Arc::new(RwLock::new(cluster_info)); - let d = GossipService::new(&c, None, None, tn.sockets.gossip, &exit); + let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); exit.store(true, Ordering::Relaxed); d.join().unwrap(); } diff --git a/core/src/lib.rs b/core/src/lib.rs index 08bde97ad..2cfac0634 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -43,6 +43,8 @@ pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscriptions; pub mod sendmmsg; +pub mod serve_repair; +pub mod serve_repair_service; pub mod sigverify; pub mod sigverify_shreds; pub mod sigverify_stage; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 2dc77c1f7..9a67dc363 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -1,8 +1,10 @@ //! The `repair_service` module implements the tools necessary to generate a thread which //! regularly finds missing shreds in the ledger and sends repair requests for those shreds use crate::{ - cluster_info::ClusterInfo, cluster_info_repair_listener::ClusterInfoRepairListener, + cluster_info::ClusterInfo, + cluster_info_repair_listener::ClusterInfoRepairListener, result::Result, + serve_repair::{RepairType, ServeRepair}, }; use solana_ledger::{ bank_forks::BankForks, @@ -33,23 +35,6 @@ pub enum RepairStrategy { }, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] -pub enum RepairType { - Orphan(Slot), - HighestShred(Slot, u64), - Shred(Slot, u64), -} - -impl RepairType { - pub fn slot(&self) -> Slot { - match self { - RepairType::Orphan(slot) => *slot, - RepairType::HighestShred(slot, _) => *slot, - RepairType::Shred(slot, _) => *slot, - } - } -} - pub struct RepairSlotRange { pub start: Slot, pub end: Slot, @@ -116,6 +101,7 @@ impl RepairService { cluster_info: &Arc>, repair_strategy: RepairStrategy, ) { + let serve_repair = ServeRepair::new(cluster_info.clone()); let mut epoch_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); let mut current_root = 0; @@ -173,9 +159,7 @@ impl RepairService { let reqs: Vec<_> = repairs .into_iter() .filter_map(|repair_request| { - cluster_info - .read() - .unwrap() + serve_repair .repair_request(&repair_request) .map(|result| (result, repair_request)) .ok() diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs new file mode 100644 index 000000000..0fee536a2 --- /dev/null +++ b/core/src/serve_repair.rs @@ -0,0 +1,676 @@ +use crate::packet::limited_deserialize; +use crate::streamer::{PacketReceiver, PacketSender}; +use crate::{ + cluster_info::{ClusterInfo, ClusterInfoError}, + contact_info::ContactInfo, + packet::Packet, + result::Result, +}; +use bincode::serialize; +use rand::{thread_rng, Rng}; +use solana_ledger::blockstore::Blockstore; +use solana_measure::thread_mem_usage; +use solana_metrics::{datapoint_debug, inc_new_counter_debug}; +use solana_perf::packet::{Packets, PacketsRecycler}; +use solana_sdk::{ + clock::Slot, + signature::{Keypair, KeypairUtil}, + timing::duration_as_ms, +}; +use std::{ + net::SocketAddr, + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +/// the number of slots to respond with when responding to `Orphan` requests +pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10; + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum RepairType { + Orphan(Slot), + HighestShred(Slot, u64), + Shred(Slot, u64), +} + +impl RepairType { + pub fn slot(&self) -> Slot { + match self { + RepairType::Orphan(slot) => *slot, + RepairType::HighestShred(slot, _) => *slot, + RepairType::Shred(slot, _) => *slot, + } + } +} + +/// Window protocol messages +#[derive(Serialize, Deserialize, Debug)] +enum RepairProtocol { + WindowIndex(ContactInfo, u64, u64), + HighestWindowIndex(ContactInfo, u64, u64), + Orphan(ContactInfo, u64), +} + +#[derive(Clone)] +pub struct ServeRepair { + /// set the keypair that will be used to sign repair responses + keypair: Arc, + my_info: ContactInfo, + cluster_info: Arc>, +} + +impl ServeRepair { + /// Without a valid keypair gossip will not function. Only useful for tests. + pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { + Self::new(Arc::new(RwLock::new( + ClusterInfo::new_with_invalid_keypair(contact_info), + ))) + } + + pub fn new(cluster_info: Arc>) -> Self { + let (keypair, my_info) = { + let r_cluster_info = cluster_info.read().unwrap(); + (r_cluster_info.keypair.clone(), r_cluster_info.my_data()) + }; + Self { + keypair, + my_info, + cluster_info, + } + } + + pub fn my_info(&self) -> &ContactInfo { + &self.my_info + } + + pub fn keypair(&self) -> &Arc { + &self.keypair + } + + fn get_repair_sender(request: &RepairProtocol) -> &ContactInfo { + match request { + RepairProtocol::WindowIndex(ref from, _, _) => from, + RepairProtocol::HighestWindowIndex(ref from, _, _) => from, + RepairProtocol::Orphan(ref from, _) => from, + } + } + + fn handle_repair( + me: &Arc>, + recycler: &PacketsRecycler, + from_addr: &SocketAddr, + blockstore: Option<&Arc>, + request: RepairProtocol, + ) -> Option { + let now = Instant::now(); + + //TODO verify from is signed + let my_id = me.read().unwrap().keypair.pubkey(); + let from = Self::get_repair_sender(&request); + if from.id == my_id { + warn!( + "{}: Ignored received repair request from ME {}", + my_id, from.id, + ); + inc_new_counter_debug!("serve_repair-handle-repair--eq", 1); + return None; + } + + let (res, label) = { + match &request { + RepairProtocol::WindowIndex(from, slot, shred_index) => { + inc_new_counter_debug!("serve_repair-request-window-index", 1); + ( + Self::run_window_request( + recycler, + from, + &from_addr, + blockstore, + &me.read().unwrap().my_info, + *slot, + *shred_index, + ), + "WindowIndex", + ) + } + + RepairProtocol::HighestWindowIndex(_, slot, highest_index) => { + inc_new_counter_debug!("serve_repair-request-highest-window-index", 1); + ( + Self::run_highest_window_request( + recycler, + &from_addr, + blockstore, + *slot, + *highest_index, + ), + "HighestWindowIndex", + ) + } + RepairProtocol::Orphan(_, slot) => { + inc_new_counter_debug!("serve_repair-request-orphan", 1); + ( + Self::run_orphan( + recycler, + &from_addr, + blockstore, + *slot, + MAX_ORPHAN_REPAIR_RESPONSES, + ), + "Orphan", + ) + } + } + }; + + trace!("{}: received repair request: {:?}", my_id, request); + Self::report_time_spent(label, &now.elapsed(), ""); + res + } + + fn report_time_spent(label: &str, time: &Duration, extra: &str) { + let count = duration_as_ms(time); + if count > 5 { + info!("{} took: {} ms {}", label, count, extra); + } + } + + /// Process messages from the network + fn run_listen( + obj: &Arc>, + recycler: &PacketsRecycler, + blockstore: Option<&Arc>, + requests_receiver: &PacketReceiver, + response_sender: &PacketSender, + ) -> Result<()> { + //TODO cache connections + let timeout = Duration::new(1, 0); + let reqs = requests_receiver.recv_timeout(timeout)?; + + Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); + Ok(()) + } + + pub fn listen( + me: Arc>, + blockstore: Option>, + requests_receiver: PacketReceiver, + response_sender: PacketSender, + exit: &Arc, + ) -> JoinHandle<()> { + let exit = exit.clone(); + let recycler = PacketsRecycler::default(); + Builder::new() + .name("solana-repair-listen".to_string()) + .spawn(move || loop { + let e = Self::run_listen( + &me, + &recycler, + blockstore.as_ref(), + &requests_receiver, + &response_sender, + ); + if exit.load(Ordering::Relaxed) { + return; + } + if e.is_err() { + info!("repair listener error: {:?}", e); + } + thread_mem_usage::datapoint("solana-repair-listen"); + }) + .unwrap() + } + + fn handle_packets( + me: &Arc>, + recycler: &PacketsRecycler, + blockstore: Option<&Arc>, + packets: Packets, + response_sender: &PacketSender, + ) { + // iter over the packets, collect pulls separately and process everything else + let allocated = thread_mem_usage::Allocatedp::default(); + packets.packets.iter().for_each(|packet| { + let start = allocated.get(); + let from_addr = packet.meta.addr(); + limited_deserialize(&packet.data[..packet.meta.size]) + .into_iter() + .for_each(|request| { + let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request); + if let Some(rsp) = rsp { + let _ignore_disconnect = response_sender.send(rsp); + } + }); + datapoint_debug!( + "solana-serve-repair-memory", + ("serve_repair", (allocated.get() - start) as i64, i64), + ); + }); + } + + fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { + let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index); + let out = serialize(&req)?; + Ok(out) + } + + fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result> { + let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index); + let out = serialize(&req)?; + Ok(out) + } + + fn orphan_bytes(&self, slot: Slot) -> Result> { + let req = RepairProtocol::Orphan(self.my_info.clone(), slot); + let out = serialize(&req)?; + Ok(out) + } + + pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec)> { + // find a peer that appears to be accepting replication and has the desired slot, as indicated + // by a valid tvu port location + let valid: Vec<_> = self + .cluster_info + .read() + .unwrap() + .repair_peers(repair_request.slot()); + if valid.is_empty() { + return Err(ClusterInfoError::NoPeers.into()); + } + let n = thread_rng().gen::() % valid.len(); + let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port + let out = self.map_repair_request(repair_request)?; + + Ok((addr, out)) + } + + pub fn map_repair_request(&self, repair_request: &RepairType) -> Result> { + match repair_request { + RepairType::Shred(slot, shred_index) => { + datapoint_debug!( + "serve_repair-repair", + ("repair-slot", *slot, i64), + ("repair-ix", *shred_index, i64) + ); + Ok(self.window_index_request_bytes(*slot, *shred_index)?) + } + RepairType::HighestShred(slot, shred_index) => { + datapoint_debug!( + "serve_repair-repair_highest", + ("repair-highest-slot", *slot, i64), + ("repair-highest-ix", *shred_index, i64) + ); + Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?) + } + RepairType::Orphan(slot) => { + datapoint_debug!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64)); + Ok(self.orphan_bytes(*slot)?) + } + } + } + + fn run_window_request( + recycler: &PacketsRecycler, + from: &ContactInfo, + from_addr: &SocketAddr, + blockstore: Option<&Arc>, + me: &ContactInfo, + slot: Slot, + shred_index: u64, + ) -> Option { + if let Some(blockstore) = blockstore { + // Try to find the requested index in one of the slots + let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr); + + if let Ok(Some(packet)) = packet { + inc_new_counter_debug!("serve_repair-window-request-ledger", 1); + return Some(Packets::new_with_recycler_data( + recycler, + "run_window_request", + vec![packet], + )); + } + } + + inc_new_counter_debug!("serve_repair-window-request-fail", 1); + trace!( + "{}: failed WindowIndex {} {} {}", + me.id, + from.id, + slot, + shred_index, + ); + + None + } + + fn run_highest_window_request( + recycler: &PacketsRecycler, + from_addr: &SocketAddr, + blockstore: Option<&Arc>, + slot: Slot, + highest_index: u64, + ) -> Option { + let blockstore = blockstore?; + // Try to find the requested index in one of the slots + let meta = blockstore.meta(slot).ok()??; + if meta.received > highest_index { + // meta.received must be at least 1 by this point + let packet = + Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr) + .ok()??; + return Some(Packets::new_with_recycler_data( + recycler, + "run_highest_window_request", + vec![packet], + )); + } + None + } + + fn run_orphan( + recycler: &PacketsRecycler, + from_addr: &SocketAddr, + blockstore: Option<&Arc>, + mut slot: Slot, + max_responses: usize, + ) -> Option { + let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan"); + if let Some(blockstore) = blockstore { + // Try to find the next "n" parent slots of the input slot + while let Ok(Some(meta)) = blockstore.meta(slot) { + if meta.received == 0 { + break; + } + let packet = + Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr); + if let Ok(Some(packet)) = packet { + res.packets.push(packet); + } + if meta.is_parent_set() && res.packets.len() <= max_responses { + slot = meta.parent_slot; + } else { + break; + } + } + } + if res.is_empty() { + return None; + } + Some(res) + } + + fn get_data_shred_as_packet( + blockstore: &Arc, + slot: Slot, + shred_index: u64, + dest: &SocketAddr, + ) -> Result> { + let data = blockstore.get_data_shred(slot, shred_index)?; + Ok(data.map(|data| { + let mut packet = Packet::default(); + packet.meta.size = data.len(); + packet.meta.set_addr(dest); + packet.data.copy_from_slice(&data); + packet + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::result::Error; + use solana_ledger::get_tmp_ledger_path; + use solana_ledger::{ + blockstore::make_many_slot_entries, + blockstore_processor::fill_blockstore_slot_with_ticks, + shred::{ + max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, + }, + }; + use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp}; + + /// test run_window_requestwindow requests respond with the right shred, and do not overrun + #[test] + fn run_highest_window_request() { + let recycler = PacketsRecycler::default(); + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let rv = ServeRepair::run_highest_window_request( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + 0, + 0, + ); + assert!(rv.is_none()); + + let _ = fill_blockstore_slot_with_ticks( + &blockstore, + max_ticks_per_n_shreds(1) + 1, + 2, + 1, + Hash::default(), + ); + + let rv = ServeRepair::run_highest_window_request( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + 2, + 1, + ); + let rv: Vec = rv + .expect("packets") + .packets + .into_iter() + .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) + .collect(); + assert!(!rv.is_empty()); + let index = blockstore.meta(2).unwrap().unwrap().received - 1; + assert_eq!(rv[0].index(), index as u32); + assert_eq!(rv[0].slot(), 2); + + let rv = ServeRepair::run_highest_window_request( + &recycler, + &socketaddr_any!(), + Some(&blockstore), + 2, + index + 1, + ); + assert!(rv.is_none()); + } + + Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); + } + + /// test window requests respond with the right shred, and do not overrun + #[test] + fn run_window_request() { + let recycler = PacketsRecycler::default(); + solana_logger::setup(); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let me = ContactInfo::new( + &Pubkey::new_rand(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), + socketaddr!("127.0.0.1:1238"), + socketaddr!("127.0.0.1:1239"), + socketaddr!("127.0.0.1:1240"), + socketaddr!("127.0.0.1:1241"), + socketaddr!("127.0.0.1:1242"), + socketaddr!("127.0.0.1:1243"), + 0, + ); + let rv = ServeRepair::run_window_request( + &recycler, + &me, + &socketaddr_any!(), + Some(&blockstore), + &me, + 0, + 0, + ); + assert!(rv.is_none()); + let mut common_header = ShredCommonHeader::default(); + common_header.slot = 2; + common_header.index = 1; + let mut data_header = DataShredHeader::default(); + data_header.parent_offset = 1; + let shred_info = Shred::new_empty_from_header( + common_header, + data_header, + CodingShredHeader::default(), + ); + + blockstore + .insert_shreds(vec![shred_info], None, false) + .expect("Expect successful ledger write"); + + let rv = ServeRepair::run_window_request( + &recycler, + &me, + &socketaddr_any!(), + Some(&blockstore), + &me, + 2, + 1, + ); + assert!(!rv.is_none()); + let rv: Vec = rv + .expect("packets") + .packets + .into_iter() + .filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok()) + .collect(); + assert_eq!(rv[0].index(), 1); + assert_eq!(rv[0].slot(), 2); + } + + Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); + } + + #[test] + fn window_index_request() { + let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me))); + let serve_repair = ServeRepair::new(cluster_info.clone()); + let rv = serve_repair.repair_request(&RepairType::Shred(0, 0)); + assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); + + let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243); + let nxt = ContactInfo::new( + &Pubkey::new_rand(), + socketaddr!([127, 0, 0, 1], 1234), + socketaddr!([127, 0, 0, 1], 1235), + socketaddr!([127, 0, 0, 1], 1236), + socketaddr!([127, 0, 0, 1], 1237), + socketaddr!([127, 0, 0, 1], 1238), + socketaddr!([127, 0, 0, 1], 1239), + socketaddr!([127, 0, 0, 1], 1240), + socketaddr!([127, 0, 0, 1], 1241), + socketaddr!([127, 0, 0, 1], 1242), + serve_repair_addr, + 0, + ); + cluster_info.write().unwrap().insert_info(nxt.clone()); + let rv = serve_repair + .repair_request(&RepairType::Shred(0, 0)) + .unwrap(); + assert_eq!(nxt.serve_repair, serve_repair_addr); + assert_eq!(rv.0, nxt.serve_repair); + + let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243); + let nxt = ContactInfo::new( + &Pubkey::new_rand(), + socketaddr!([127, 0, 0, 1], 1234), + socketaddr!([127, 0, 0, 1], 1235), + socketaddr!([127, 0, 0, 1], 1236), + socketaddr!([127, 0, 0, 1], 1237), + socketaddr!([127, 0, 0, 1], 1238), + socketaddr!([127, 0, 0, 1], 1239), + socketaddr!([127, 0, 0, 1], 1240), + socketaddr!([127, 0, 0, 1], 1241), + socketaddr!([127, 0, 0, 1], 1242), + serve_repair_addr2, + 0, + ); + cluster_info.write().unwrap().insert_info(nxt); + let mut one = false; + let mut two = false; + while !one || !two { + //this randomly picks an option, so eventually it should pick both + let rv = serve_repair + .repair_request(&RepairType::Shred(0, 0)) + .unwrap(); + if rv.0 == serve_repair_addr { + one = true; + } + if rv.0 == serve_repair_addr2 { + two = true; + } + } + assert!(one && two); + } + + #[test] + fn run_orphan() { + solana_logger::setup(); + let recycler = PacketsRecycler::default(); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let rv = + ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0); + assert!(rv.is_none()); + + // Create slots 1, 2, 3 with 5 shreds apiece + let (shreds, _) = make_many_slot_entries(1, 3, 5); + + blockstore + .insert_shreds(shreds, None, false) + .expect("Expect successful ledger write"); + + // We don't have slot 4, so we don't know how to service this requeset + let rv = + ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5); + assert!(rv.is_none()); + + // For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively + // for this request + let rv: Vec<_> = + ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5) + .expect("run_orphan packets") + .packets + .iter() + .map(|b| b.clone()) + .collect(); + let expected: Vec<_> = (1..=3) + .rev() + .map(|slot| { + let index = blockstore.meta(slot).unwrap().unwrap().received - 1; + ServeRepair::get_data_shred_as_packet( + &blockstore, + slot, + index, + &socketaddr_any!(), + ) + .unwrap() + .unwrap() + }) + .collect(); + assert_eq!(rv, expected) + } + + Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); + } +} diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs new file mode 100644 index 000000000..2e883fac7 --- /dev/null +++ b/core/src/serve_repair_service.rs @@ -0,0 +1,57 @@ +use crate::serve_repair::ServeRepair; +use crate::streamer; +use solana_ledger::blockstore::Blockstore; +use solana_perf::recycler::Recycler; +use std::net::UdpSocket; +use std::sync::atomic::AtomicBool; +use std::sync::mpsc::channel; +use std::sync::{Arc, RwLock}; +use std::thread::{self, JoinHandle}; + +pub struct ServeRepairService { + thread_hdls: Vec>, +} + +impl ServeRepairService { + pub fn new( + serve_repair: &Arc>, + blockstore: Option>, + serve_repair_socket: UdpSocket, + exit: &Arc, + ) -> Self { + let (request_sender, request_receiver) = channel(); + let serve_repair_socket = Arc::new(serve_repair_socket); + trace!( + "ServeRepairService: id: {}, listening on: {:?}", + &serve_repair.read().unwrap().my_info().id, + serve_repair_socket.local_addr().unwrap() + ); + let t_receiver = streamer::receiver( + serve_repair_socket.clone(), + &exit, + request_sender, + Recycler::default(), + "serve_repair_receiver", + ); + let (response_sender, response_receiver) = channel(); + let t_responder = + streamer::responder("serve-repairs", serve_repair_socket, response_receiver); + let t_listen = ServeRepair::listen( + serve_repair.clone(), + blockstore, + request_receiver, + response_sender, + exit, + ); + + let thread_hdls = vec![t_receiver, t_responder, t_listen]; + Self { thread_hdls } + } + + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } +} diff --git a/core/src/validator.rs b/core/src/validator.rs index c2f84bc87..cfff47632 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -12,6 +12,8 @@ use crate::{ rpc_pubsub_service::PubSubService, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, + serve_repair::ServeRepair, + serve_repair_service::ServeRepairService, sigverify, storage_stage::StorageState, tpu::Tpu, @@ -121,6 +123,7 @@ pub struct Validator { rpc_service: Option<(JsonRpcService, PubSubService)>, transaction_status_service: Option, gossip_service: GossipService, + serve_repair_service: ServeRepairService, poh_recorder: Arc>, poh_service: PohService, tpu: Tpu, @@ -302,12 +305,19 @@ impl Validator { let gossip_service = GossipService::new( &cluster_info, - Some(blockstore.clone()), Some(bank_forks.clone()), node.sockets.gossip, &exit, ); + let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone()))); + let serve_repair_service = ServeRepairService::new( + &serve_repair, + Some(blockstore.clone()), + node.sockets.serve_repair, + &exit, + ); + // Insert the entrypoint info, should only be None if this node // is the bootstrap validator if let Some(entrypoint_info) = entrypoint_info_option { @@ -403,6 +413,7 @@ impl Validator { Self { id, gossip_service, + serve_repair_service, rpc_service, transaction_status_service, tpu, @@ -463,6 +474,7 @@ impl Validator { } self.gossip_service.join()?; + self.serve_repair_service.join()?; self.tpu.join()?; self.tvu.join()?; self.ip_echo_server.shutdown_now(); diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 0a4911bf1..4a650cd18 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -21,8 +21,7 @@ fn test_node(exit: &Arc) -> (Arc>, GossipService test_node.info.clone(), keypair, ))); - let gossip_service = - GossipService::new(&cluster_info, None, None, test_node.sockets.gossip, exit); + let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); let _ = cluster_info.read().unwrap().my_data(); ( cluster_info, diff --git a/local-cluster/tests/archiver.rs b/local-cluster/tests/archiver.rs index 54cbe7e12..f4b641d1e 100644 --- a/local-cluster/tests/archiver.rs +++ b/local-cluster/tests/archiver.rs @@ -6,6 +6,7 @@ use solana_core::{ cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE}, contact_info::ContactInfo, gossip_service::discover_cluster, + serve_repair::ServeRepair, storage_stage::SLOTS_PER_TURN_TEST, validator::ValidatorConfig, }; @@ -61,10 +62,11 @@ fn run_archiver_startup_basic(num_nodes: usize, num_archivers: usize) { let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( cluster_nodes[0].clone(), ))); + let serve_repair = ServeRepair::new(cluster_info); let path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&path).unwrap()); Archiver::download_from_archiver( - &cluster_info, + &serve_repair, &archiver_info, &blockstore, slots_per_segment, diff --git a/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json b/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json index 5b2487e57..3855c2469 100644 --- a/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json +++ b/metrics/scripts/grafana-provisioning/dashboards/testnet-monitor.json @@ -6903,7 +6903,7 @@ "renderer": "flot", "seriesOverrides": [ { - "alias": "cluster_info-repair_highest.ix", + "alias": "serve_repair-repair_highest.ix", "yaxis": 2 } ], @@ -6928,7 +6928,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", @@ -6965,7 +6965,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "A", "resultFormat": "time_series", @@ -7064,7 +7064,7 @@ "renderer": "flot", "seriesOverrides": [ { - "alias": "cluster_info-repair.repair-ix", + "alias": "serve_repair-repair.repair-ix", "yaxis": 2 } ], @@ -7089,7 +7089,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-ix\") AS \"repair-ix\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-ix\") AS \"repair-ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", @@ -7126,7 +7126,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-slot\") AS \"repair-slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-slot\") AS \"repair-slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "A", "resultFormat": "time_series", @@ -7245,7 +7245,7 @@ ], "orderByTime": "ASC", "policy": "default", - "query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", + "query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)", "rawQuery": true, "refId": "C", "resultFormat": "time_series", diff --git a/validator/src/main.rs b/validator/src/main.rs index 0f0acb3c9..e67567e24 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -213,7 +213,6 @@ fn get_rpc_addr( let gossip_service = GossipService::new( &cluster_info.clone(), None, - None, node.sockets.gossip.try_clone().unwrap(), &gossip_exit_flag, );