Factor repair from gossip (#8044)

This commit is contained in:
carllin 2020-01-31 14:23:51 -08:00 committed by Michael Vines
parent 60877f9ba4
commit d3712dd26d
13 changed files with 818 additions and 613 deletions

View File

@ -16,6 +16,7 @@ use solana_core::{
packet::{limited_deserialize, PACKET_DATA_SIZE},
repair_service,
repair_service::{RepairService, RepairSlotRange, RepairStrategy},
serve_repair::ServeRepair,
shred_fetch_stage::ShredFetchStage,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
storage_stage::NUM_STORAGE_SAMPLES,
@ -195,13 +196,7 @@ impl Archiver {
Blockstore::open(ledger_path).expect("Expected to be able to open database ledger"),
);
let gossip_service = GossipService::new(
&cluster_info,
Some(blockstore.clone()),
None,
node.sockets.gossip,
&exit,
);
let gossip_service = GossipService::new(&cluster_info, None, node.sockets.gossip, &exit);
info!("Connecting to the cluster via {:?}", cluster_entrypoint);
let (nodes, _) =
@ -814,7 +809,7 @@ impl Archiver {
/// It is recommended to use a temporary blockstore for this since the download will not verify
/// shreds received and might impact the chaining of shreds across slots
pub fn download_from_archiver(
cluster_info: &Arc<RwLock<ClusterInfo>>,
serve_repair: &ServeRepair,
archiver_info: &ContactInfo,
blockstore: &Arc<Blockstore>,
slots_per_segment: u64,
@ -834,10 +829,10 @@ impl Archiver {
Recycler::default(),
"archiver_reeciver",
);
let id = cluster_info.read().unwrap().id();
let id = serve_repair.keypair().pubkey();
info!(
"Sending repair requests from: {} to: {}",
cluster_info.read().unwrap().my_data().id,
serve_repair.my_info().id,
archiver_info.gossip
);
let repair_slot_range = RepairSlotRange {
@ -857,9 +852,7 @@ impl Archiver {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
cluster_info
.read()
.unwrap()
serve_repair
.map_repair_request(&repair_request)
.map(|result| ((archiver_info.gossip, result), repair_request))
.ok()

View File

@ -21,7 +21,6 @@ use crate::{
crds_gossip_pull::{CrdsFilter, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS},
crds_value::{self, CrdsData, CrdsValue, CrdsValueLabel, EpochSlots, Vote},
packet::{Packet, PACKET_DATA_SIZE},
repair_service::RepairType,
result::{Error, Result},
sendmmsg::{multicast, send_mmsg},
weighted_shuffle::{weighted_best, weighted_shuffle},
@ -29,8 +28,7 @@ use crate::{
use bincode::{serialize, serialized_size};
use core::cmp;
use itertools::Itertools;
use rand::{thread_rng, Rng};
use solana_ledger::{bank_forks::BankForks, blockstore::Blockstore, staking_utils};
use solana_ledger::{bank_forks::BankForks, staking_utils};
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug, inc_new_counter_error};
use solana_net_utils::{
@ -63,15 +61,12 @@ pub const VALIDATOR_PORT_RANGE: PortRange = (8000, 10_000);
pub const DATA_PLANE_FANOUT: usize = 200;
/// milliseconds we sleep for between gossip requests
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
/// The maximum size of a bloom filter
pub const MAX_BLOOM_SIZE: usize = 1028;
pub const MAX_BLOOM_SIZE: usize = 1018;
/// The maximum size of a protocol payload
const MAX_PROTOCOL_PAYLOAD_SIZE: u64 = PACKET_DATA_SIZE as u64 - MAX_PROTOCOL_HEADER_SIZE;
/// The largest protocol header size
const MAX_PROTOCOL_HEADER_SIZE: u64 = 204;
const MAX_PROTOCOL_HEADER_SIZE: u64 = 214;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
@ -174,12 +169,6 @@ enum Protocol {
PullResponse(Pubkey, Vec<CrdsValue>),
PushMessage(Pubkey, Vec<CrdsValue>),
PruneMessage(Pubkey, PruneData),
/// Window protocol messages
/// TODO: move this message to a different module
RequestWindowIndex(ContactInfo, u64, u64),
RequestHighestWindowIndex(ContactInfo, u64, u64),
RequestOrphan(ContactInfo, u64),
}
impl ClusterInfo {
@ -529,7 +518,7 @@ impl ClusterInfo {
}
/// all tvu peers with valid gossip addrs that likely have the slot being requested
fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
pub fn repair_peers(&self, slot: Slot) -> Vec<ContactInfo> {
let me = self.my_data();
ClusterInfo::tvu_peers(self)
.into_iter()
@ -871,61 +860,6 @@ impl ClusterInfo {
Ok(())
}
pub fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data(), slot, shred_index);
let out = serialize(&req)?;
Ok(out)
}
fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestHighestWindowIndex(self.my_data(), slot, shred_index);
let out = serialize(&req)?;
Ok(out)
}
fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> {
let req = Protocol::RequestOrphan(self.my_data(), slot);
let out = serialize(&req)?;
Ok(out)
}
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let valid: Vec<_> = self.repair_peers(repair_request.slot());
if valid.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].gossip; // send the request to the peer's gossip port
let out = self.map_repair_request(repair_request)?;
Ok((addr, out))
}
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
datapoint_debug!(
"cluster_info-repair",
("repair-slot", *slot, i64),
("repair-ix", *shred_index, i64)
);
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
}
RepairType::HighestShred(slot, shred_index) => {
datapoint_debug!(
"cluster_info-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *shred_index, i64)
);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
}
RepairType::Orphan(slot) => {
datapoint_debug!("cluster_info-repair_orphan", ("repair-orphan", *slot, i64));
Ok(self.orphan_bytes(*slot)?)
}
}
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) {
let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint {
@ -1177,117 +1111,9 @@ impl ClusterInfo {
.unwrap()
}
fn get_data_shred_as_packet(
blockstore: &Arc<Blockstore>,
slot: Slot,
shred_index: u64,
dest: &SocketAddr,
) -> Result<Option<Packet>> {
let data = blockstore.get_data_shred(slot, shred_index)?;
Ok(data.map(|data| {
let mut packet = Packet::default();
packet.meta.size = data.len();
packet.meta.set_addr(dest);
packet.data.copy_from_slice(&data);
packet
}))
}
fn run_window_request(
recycler: &PacketsRecycler,
from: &ContactInfo,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
me: &ContactInfo,
slot: Slot,
shred_index: u64,
) -> Option<Packets> {
if let Some(blockstore) = blockstore {
// Try to find the requested index in one of the slots
let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr);
if let Ok(Some(packet)) = packet {
inc_new_counter_debug!("cluster_info-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
));
}
}
inc_new_counter_debug!("cluster_info-window-request-fail", 1);
trace!(
"{}: failed RequestWindowIndex {} {} {}",
me.id,
from.id,
slot,
shred_index,
);
None
}
fn run_highest_window_request(
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
slot: Slot,
highest_index: u64,
) -> Option<Packets> {
let blockstore = blockstore?;
// Try to find the requested index in one of the slots
let meta = blockstore.meta(slot).ok()??;
if meta.received > highest_index {
// meta.received must be at least 1 by this point
let packet =
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr)
.ok()??;
return Some(Packets::new_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
));
}
None
}
fn run_orphan(
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
mut slot: Slot,
max_responses: usize,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
if meta.received == 0 {
break;
}
let packet =
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr);
if let Ok(Some(packet)) = packet {
res.packets.push(packet);
}
if meta.is_parent_set() && res.packets.len() <= max_responses {
slot = meta.parent_slot;
} else {
break;
}
}
}
if res.is_empty() {
return None;
}
Some(res)
}
fn handle_packets(
me: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
blockstore: Option<&Arc<Blockstore>>,
stakes: &HashMap<Pubkey, u64>,
packets: Packets,
response_sender: &PacketSender,
@ -1396,13 +1222,6 @@ impl ClusterInfo {
("prune_message", (allocated.get() - start) as i64, i64),
);
}
_ => {
let rsp =
Self::handle_repair(me, recycler, &from_addr, blockstore, request);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
}
})
});
// process the collected pulls together
@ -1535,104 +1354,10 @@ impl ClusterInfo {
}
}
fn get_repair_sender(request: &Protocol) -> &ContactInfo {
match request {
Protocol::RequestWindowIndex(ref from, _, _) => from,
Protocol::RequestHighestWindowIndex(ref from, _, _) => from,
Protocol::RequestOrphan(ref from, _) => from,
_ => panic!("Not a repair request"),
}
}
fn handle_repair(
me: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
request: Protocol,
) -> Option<Packets> {
let now = Instant::now();
//TODO this doesn't depend on cluster_info module, could be moved
//but we are using the listen thread to service these request
//TODO verify from is signed
let self_id = me.read().unwrap().gossip.id;
let from = Self::get_repair_sender(&request);
if from.id == me.read().unwrap().gossip.id {
warn!(
"{}: Ignored received repair request from ME {}",
self_id, from.id,
);
inc_new_counter_debug!("cluster_info-handle-repair--eq", 1);
return None;
}
me.write()
.unwrap()
.gossip
.crds
.update_record_timestamp(&from.id, timestamp());
let my_info = me.read().unwrap().my_data();
let (res, label) = {
match &request {
Protocol::RequestWindowIndex(from, slot, shred_index) => {
inc_new_counter_debug!("cluster_info-request-window-index", 1);
(
Self::run_window_request(
recycler,
from,
&from_addr,
blockstore,
&my_info,
*slot,
*shred_index,
),
"RequestWindowIndex",
)
}
Protocol::RequestHighestWindowIndex(_, slot, highest_index) => {
inc_new_counter_debug!("cluster_info-request-highest-window-index", 1);
(
Self::run_highest_window_request(
recycler,
&from_addr,
blockstore,
*slot,
*highest_index,
),
"RequestHighestWindowIndex",
)
}
Protocol::RequestOrphan(_, slot) => {
inc_new_counter_debug!("cluster_info-request-orphan", 1);
(
Self::run_orphan(
recycler,
&from_addr,
blockstore,
*slot,
MAX_ORPHAN_REPAIR_RESPONSES,
),
"RequestOrphan",
)
}
_ => panic!("Not a repair request"),
}
};
trace!("{}: received repair request: {:?}", self_id, request);
report_time_spent(label, &now.elapsed(), "");
res
}
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
blockstore: Option<&Arc<Blockstore>>,
bank_forks: Option<&Arc<RwLock<BankForks>>>,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
@ -1656,20 +1381,11 @@ impl ClusterInfo {
}
};
Self::handle_packets(
obj,
&recycler,
blockstore,
&stakes,
reqs,
response_sender,
epoch_ms,
);
Self::handle_packets(obj, &recycler, &stakes, reqs, response_sender, epoch_ms);
Ok(())
}
pub fn listen(
me: Arc<RwLock<Self>>,
blockstore: Option<Arc<Blockstore>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
requests_receiver: PacketReceiver,
response_sender: PacketSender,
@ -1683,7 +1399,6 @@ impl ClusterInfo {
let e = Self::run_listen(
&me,
&recycler,
blockstore.as_ref(),
bank_forks.as_ref(),
&requests_receiver,
&response_sender,
@ -1718,6 +1433,7 @@ impl ClusterInfo {
dummy_addr,
dummy_addr,
dummy_addr,
dummy_addr,
timestamp(),
)
}
@ -1798,6 +1514,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
pub storage: Option<UdpSocket>,
pub serve_repair: UdpSocket,
}
#[derive(Debug)]
@ -1818,9 +1535,10 @@ impl Node {
let storage = UdpSocket::bind("127.0.0.1:0").unwrap();
let empty = "0.0.0.0:0".parse().unwrap();
let repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let info = ContactInfo::new(
pubkey,
gossip.local_addr().unwrap(),
@ -1832,6 +1550,7 @@ impl Node {
storage.local_addr().unwrap(),
empty,
empty,
serve_repair.local_addr().unwrap(),
timestamp(),
);
@ -1846,6 +1565,7 @@ impl Node {
broadcast,
repair,
retransmit_sockets: vec![retransmit],
serve_repair,
storage: Some(storage),
ip_echo: None,
},
@ -1868,6 +1588,7 @@ impl Node {
let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()];
let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let storage = UdpSocket::bind("0.0.0.0:0").unwrap();
let serve_repair = UdpSocket::bind("127.0.0.1:0").unwrap();
let info = ContactInfo::new(
pubkey,
gossip_addr,
@ -1879,6 +1600,7 @@ impl Node {
storage.local_addr().unwrap(),
rpc_addr,
rpc_pubsub_addr,
serve_repair.local_addr().unwrap(),
timestamp(),
);
Node {
@ -1894,6 +1616,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
storage: None,
serve_repair,
},
}
}
@ -1936,6 +1659,8 @@ impl Node {
multi_bind_in_range(port_range, 8).expect("retransmit multi_bind");
let (repair_port, repair) = Self::bind(port_range);
let (serve_repair_port, serve_repair) = Self::bind(port_range);
let (_, broadcast) = multi_bind_in_range(port_range, 4).expect("broadcast multi_bind");
let info = ContactInfo::new(
@ -1946,6 +1671,7 @@ impl Node {
SocketAddr::new(gossip_addr.ip(), repair_port),
SocketAddr::new(gossip_addr.ip(), tpu_port),
SocketAddr::new(gossip_addr.ip(), tpu_forwards_port),
SocketAddr::new(gossip_addr.ip(), serve_repair_port),
socketaddr_any!(),
socketaddr_any!(),
socketaddr_any!(),
@ -1965,6 +1691,7 @@ impl Node {
repair,
retransmit_sockets,
storage: None,
serve_repair,
ip_echo: Some(ip_echo),
},
}
@ -2001,18 +1728,8 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
mod tests {
use super::*;
use crate::crds_value::CrdsValueLabel;
use crate::repair_service::RepairType;
use crate::result::Error;
use rayon::prelude::*;
use solana_ledger::blockstore::make_many_slot_entries;
use solana_ledger::blockstore::Blockstore;
use solana_ledger::blockstore_processor::fill_blockstore_slot_with_ticks;
use solana_ledger::get_tmp_ledger_path;
use solana_ledger::shred::{
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
};
use solana_perf::test_tx::test_tx;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet;
use std::net::{IpAddr, Ipv4Addr};
@ -2083,242 +1800,6 @@ mod tests {
let label = CrdsValueLabel::ContactInfo(d.id);
assert!(cluster_info.gossip.crds.lookup(&label).is_none());
}
#[test]
fn window_index_request() {
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me);
let rv = cluster_info.repair_request(&RepairType::Shred(0, 0));
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let gossip_addr = socketaddr!([127, 0, 0, 1], 1234);
let nxt = ContactInfo::new(
&Pubkey::new_rand(),
gossip_addr,
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
socketaddr!([127, 0, 0, 1], 1241),
socketaddr!([127, 0, 0, 1], 1242),
0,
);
cluster_info.insert_info(nxt.clone());
let rv = cluster_info
.repair_request(&RepairType::Shred(0, 0))
.unwrap();
assert_eq!(nxt.gossip, gossip_addr);
assert_eq!(rv.0, nxt.gossip);
let gossip_addr2 = socketaddr!([127, 0, 0, 2], 1234);
let nxt = ContactInfo::new(
&Pubkey::new_rand(),
gossip_addr2,
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
socketaddr!([127, 0, 0, 1], 1241),
socketaddr!([127, 0, 0, 1], 1242),
0,
);
cluster_info.insert_info(nxt);
let mut one = false;
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = cluster_info
.repair_request(&RepairType::Shred(0, 0))
.unwrap();
if rv.0 == gossip_addr {
one = true;
}
if rv.0 == gossip_addr2 {
two = true;
}
}
assert!(one && two);
}
/// test window requests respond with the right shred, and do not overrun
#[test]
fn run_window_request() {
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let me = ContactInfo::new(
&Pubkey::new_rand(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
socketaddr!("127.0.0.1:1240"),
socketaddr!("127.0.0.1:1241"),
socketaddr!("127.0.0.1:1242"),
0,
);
let rv = ClusterInfo::run_window_request(
&recycler,
&me,
&socketaddr_any!(),
Some(&blockstore),
&me,
0,
0,
);
assert!(rv.is_none());
let mut common_header = ShredCommonHeader::default();
common_header.slot = 2;
common_header.index = 1;
let mut data_header = DataShredHeader::default();
data_header.parent_offset = 1;
let shred_info = Shred::new_empty_from_header(
common_header,
data_header,
CodingShredHeader::default(),
);
blockstore
.insert_shreds(vec![shred_info], None, false)
.expect("Expect successful ledger write");
let rv = ClusterInfo::run_window_request(
&recycler,
&me,
&socketaddr_any!(),
Some(&blockstore),
&me,
2,
1,
);
assert!(!rv.is_none());
let rv: Vec<Shred> = rv
.expect("packets")
.packets
.into_iter()
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), 2);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
#[test]
fn run_highest_window_request() {
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv = ClusterInfo::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
0,
0,
);
assert!(rv.is_none());
let _ = fill_blockstore_slot_with_ticks(
&blockstore,
max_ticks_per_n_shreds(1) + 1,
2,
1,
Hash::default(),
);
let rv = ClusterInfo::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
2,
1,
);
let rv: Vec<Shred> = rv
.expect("packets")
.packets
.into_iter()
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.collect();
assert!(!rv.is_empty());
let index = blockstore.meta(2).unwrap().unwrap().received - 1;
assert_eq!(rv[0].index(), index as u32);
assert_eq!(rv[0].slot(), 2);
let rv = ClusterInfo::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
2,
index + 1,
);
assert!(rv.is_none());
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn run_orphan() {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv =
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0);
assert!(rv.is_none());
// Create slots 1, 2, 3 with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(1, 3, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot 4, so we don't know how to service this requeset
let rv =
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5);
assert!(rv.is_none());
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
// for this request
let rv: Vec<_> =
ClusterInfo::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5)
.expect("run_orphan packets")
.packets
.iter()
.map(|b| b.clone())
.collect();
let expected: Vec<_> = (1..=3)
.rev()
.map(|slot| {
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
ClusterInfo::get_data_shred_as_packet(
&blockstore,
slot,
index,
&socketaddr_any!(),
)
.unwrap()
.unwrap()
})
.collect();
assert_eq!(rv, expected)
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
fn assert_in_range(x: u16, range: (u16, u16)) {
assert!(x >= range.0);
@ -2701,13 +2182,16 @@ mod tests {
}
fn test_split_messages(value: CrdsValue) {
const NUM_VALUES: usize = 30;
const NUM_VALUES: u64 = 30;
let value_size = value.size();
let expected_len = NUM_VALUES / (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1) as usize;
let msgs = vec![value; NUM_VALUES];
let num_values_per_payload = (MAX_PROTOCOL_PAYLOAD_SIZE / value_size).max(1);
// Expected len is the ceiling of the division
let expected_len = (NUM_VALUES + num_values_per_payload - 1) / num_values_per_payload;
let msgs = vec![value; NUM_VALUES as usize];
let split = ClusterInfo::split_gossip_messages(msgs);
assert!(split.len() <= expected_len);
assert!(split.len() as u64 <= expected_len);
}
#[test]
@ -2880,25 +2364,6 @@ mod tests {
- serialized_size(&PruneData::default()).unwrap(),
);
// make sure repairs are always smaller than the gossip messages
assert!(
max_protocol_size
> serialized_size(&Protocol::RequestWindowIndex(ContactInfo::default(), 0, 0))
.unwrap()
);
assert!(
max_protocol_size
> serialized_size(&Protocol::RequestHighestWindowIndex(
ContactInfo::default(),
0,
0
))
.unwrap()
);
assert!(
max_protocol_size
> serialized_size(&Protocol::RequestOrphan(ContactInfo::default(), 0)).unwrap()
);
// finally assert the header size estimation is correct
assert_eq!(MAX_PROTOCOL_HEADER_SIZE, max_protocol_size);
}

View File

@ -17,7 +17,7 @@ pub struct ContactInfo {
pub tvu: SocketAddr,
/// address to forward shreds to
pub tvu_forwards: SocketAddr,
/// address to send repairs to
/// address to send repair responses to
pub repair: SocketAddr,
/// transactions address
pub tpu: SocketAddr,
@ -29,6 +29,8 @@ pub struct ContactInfo {
pub rpc: SocketAddr,
/// websocket for JSON-RPC push notifications
pub rpc_pubsub: SocketAddr,
/// address to send repair requests to
pub serve_repair: SocketAddr,
/// latest wallclock picked
pub wallclock: u64,
/// node shred version
@ -85,6 +87,7 @@ impl Default for ContactInfo {
storage_addr: socketaddr_any!(),
rpc: socketaddr_any!(),
rpc_pubsub: socketaddr_any!(),
serve_repair: socketaddr_any!(),
wallclock: 0,
shred_version: 0,
}
@ -104,6 +107,7 @@ impl ContactInfo {
storage_addr: SocketAddr,
rpc: SocketAddr,
rpc_pubsub: SocketAddr,
serve_repair: SocketAddr,
now: u64,
) -> Self {
Self {
@ -117,6 +121,7 @@ impl ContactInfo {
storage_addr,
rpc,
rpc_pubsub,
serve_repair,
wallclock: now,
shred_version: 0,
}
@ -134,6 +139,7 @@ impl ContactInfo {
socketaddr!("127.0.0.1:1240"),
socketaddr!("127.0.0.1:1241"),
socketaddr!("127.0.0.1:1242"),
socketaddr!("127.0.0.1:1243"),
now,
)
}
@ -154,6 +160,7 @@ impl ContactInfo {
addr,
addr,
addr,
addr,
0,
)
}
@ -174,6 +181,7 @@ impl ContactInfo {
let repair = next_port(&bind_addr, 5);
let rpc_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PORT);
let rpc_pubsub_addr = SocketAddr::new(bind_addr.ip(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
let serve_repair = next_port(&bind_addr, 6);
Self::new(
pubkey,
gossip_addr,
@ -185,6 +193,7 @@ impl ContactInfo {
"0.0.0.0:0".parse().unwrap(),
rpc_addr,
rpc_pubsub_addr,
serve_repair,
timestamp(),
)
}
@ -209,6 +218,7 @@ impl ContactInfo {
daddr,
daddr,
daddr,
daddr,
timestamp(),
)
}
@ -267,6 +277,7 @@ mod tests {
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
assert!(ci.storage_addr.ip().is_unspecified());
assert!(ci.serve_repair.ip().is_unspecified());
}
#[test]
fn test_multicast() {
@ -278,6 +289,7 @@ mod tests {
assert!(ci.rpc_pubsub.ip().is_multicast());
assert!(ci.tpu.ip().is_multicast());
assert!(ci.storage_addr.ip().is_multicast());
assert!(ci.serve_repair.ip().is_multicast());
}
#[test]
fn test_entry_point() {
@ -290,6 +302,7 @@ mod tests {
assert!(ci.rpc_pubsub.ip().is_unspecified());
assert!(ci.tpu.ip().is_unspecified());
assert!(ci.storage_addr.ip().is_unspecified());
assert!(ci.serve_repair.ip().is_unspecified());
}
#[test]
fn test_socketaddr() {
@ -302,7 +315,9 @@ mod tests {
assert_eq!(ci.rpc.port(), rpc_port::DEFAULT_RPC_PORT);
assert_eq!(ci.rpc_pubsub.port(), rpc_port::DEFAULT_RPC_PUBSUB_PORT);
assert!(ci.storage_addr.ip().is_unspecified());
assert_eq!(ci.serve_repair.port(), 16);
}
#[test]
fn replayed_data_new_with_socketaddr_with_pubkey() {
let keypair = Keypair::new();
@ -323,6 +338,9 @@ mod tests {
d1.rpc_pubsub,
socketaddr!(format!("127.0.0.1:{}", rpc_port::DEFAULT_RPC_PUBSUB_PORT))
);
assert_eq!(d1.tvu_forwards, socketaddr!("127.0.0.1:1238"));
assert_eq!(d1.repair, socketaddr!("127.0.0.1:1239"));
assert_eq!(d1.serve_repair, socketaddr!("127.0.0.1:1240"));
}
#[test]

View File

@ -6,7 +6,6 @@ use crate::streamer;
use rand::{thread_rng, Rng};
use solana_client::thin_client::{create_client, ThinClient};
use solana_ledger::bank_forks::BankForks;
use solana_ledger::blockstore::Blockstore;
use solana_perf::recycler::Recycler;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -24,7 +23,6 @@ pub struct GossipService {
impl GossipService {
pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>,
blockstore: Option<Arc<Blockstore>>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
gossip_socket: UdpSocket,
exit: &Arc<AtomicBool>,
@ -47,7 +45,6 @@ impl GossipService {
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let t_listen = ClusterInfo::listen(
cluster_info.clone(),
blockstore,
bank_forks.clone(),
request_receiver,
response_sender.clone(),
@ -283,8 +280,7 @@ fn make_gossip_node(
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint));
}
let cluster_info = Arc::new(RwLock::new(cluster_info));
let gossip_service =
GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit);
(gossip_service, ip_echo, cluster_info)
}
@ -303,7 +299,7 @@ mod tests {
let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone());
let c = Arc::new(RwLock::new(cluster_info));
let d = GossipService::new(&c, None, None, tn.sockets.gossip, &exit);
let d = GossipService::new(&c, None, tn.sockets.gossip, &exit);
exit.store(true, Ordering::Relaxed);
d.join().unwrap();
}

View File

@ -42,6 +42,8 @@ pub mod rpc_pubsub_service;
pub mod rpc_service;
pub mod rpc_subscriptions;
pub mod sendmmsg;
pub mod serve_repair;
pub mod serve_repair_service;
pub mod sigverify;
pub mod sigverify_shreds;
pub mod sigverify_stage;

View File

@ -1,6 +1,10 @@
//! The `repair_service` module implements the tools necessary to generate a thread which
//! regularly finds missing shreds in the ledger and sends repair requests for those shreds
use crate::{cluster_info::ClusterInfo, result::Result};
use crate::{
cluster_info::ClusterInfo,
result::Result,
serve_repair::{RepairType, ServeRepair},
};
use solana_ledger::{
bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta},
@ -30,23 +34,6 @@ pub enum RepairStrategy {
},
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepairType {
Orphan(Slot),
HighestShred(Slot, u64),
Shred(Slot, u64),
}
impl RepairType {
pub fn slot(&self) -> Slot {
match self {
RepairType::Orphan(slot) => *slot,
RepairType::HighestShred(slot, _) => *slot,
RepairType::Shred(slot, _) => *slot,
}
}
}
pub struct RepairSlotRange {
pub start: Slot,
pub end: Slot,
@ -96,6 +83,7 @@ impl RepairService {
cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_strategy: RepairStrategy,
) {
let serve_repair = ServeRepair::new(cluster_info.clone());
let mut epoch_slots: BTreeSet<Slot> = BTreeSet::new();
let id = cluster_info.read().unwrap().id();
let mut current_root = 0;
@ -153,9 +141,7 @@ impl RepairService {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
cluster_info
.read()
.unwrap()
serve_repair
.repair_request(&repair_request)
.map(|result| (result, repair_request))
.ok()

676
core/src/serve_repair.rs Normal file
View File

@ -0,0 +1,676 @@
use crate::packet::limited_deserialize;
use crate::streamer::{PacketReceiver, PacketSender};
use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
packet::Packet,
result::Result,
};
use bincode::serialize;
use rand::{thread_rng, Rng};
use solana_ledger::blockstore::Blockstore;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{Packets, PacketsRecycler};
use solana_sdk::{
clock::Slot,
signature::{Keypair, KeypairUtil},
timing::duration_as_ms,
};
use std::{
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
};
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepairType {
Orphan(Slot),
HighestShred(Slot, u64),
Shred(Slot, u64),
}
impl RepairType {
pub fn slot(&self) -> Slot {
match self {
RepairType::Orphan(slot) => *slot,
RepairType::HighestShred(slot, _) => *slot,
RepairType::Shred(slot, _) => *slot,
}
}
}
/// Window protocol messages
#[derive(Serialize, Deserialize, Debug)]
enum RepairProtocol {
WindowIndex(ContactInfo, u64, u64),
HighestWindowIndex(ContactInfo, u64, u64),
Orphan(ContactInfo, u64),
}
#[derive(Clone)]
pub struct ServeRepair {
/// set the keypair that will be used to sign repair responses
keypair: Arc<Keypair>,
my_info: ContactInfo,
cluster_info: Arc<RwLock<ClusterInfo>>,
}
impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests.
pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self {
Self::new(Arc::new(RwLock::new(
ClusterInfo::new_with_invalid_keypair(contact_info),
)))
}
pub fn new(cluster_info: Arc<RwLock<ClusterInfo>>) -> Self {
let (keypair, my_info) = {
let r_cluster_info = cluster_info.read().unwrap();
(r_cluster_info.keypair.clone(), r_cluster_info.my_data())
};
Self {
keypair,
my_info,
cluster_info,
}
}
pub fn my_info(&self) -> &ContactInfo {
&self.my_info
}
pub fn keypair(&self) -> &Arc<Keypair> {
&self.keypair
}
fn get_repair_sender(request: &RepairProtocol) -> &ContactInfo {
match request {
RepairProtocol::WindowIndex(ref from, _, _) => from,
RepairProtocol::HighestWindowIndex(ref from, _, _) => from,
RepairProtocol::Orphan(ref from, _) => from,
}
}
fn handle_repair(
me: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
request: RepairProtocol,
) -> Option<Packets> {
let now = Instant::now();
//TODO verify from is signed
let my_id = me.read().unwrap().keypair.pubkey();
let from = Self::get_repair_sender(&request);
if from.id == my_id {
warn!(
"{}: Ignored received repair request from ME {}",
my_id, from.id,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", 1);
return None;
}
let (res, label) = {
match &request {
RepairProtocol::WindowIndex(from, slot, shred_index) => {
inc_new_counter_debug!("serve_repair-request-window-index", 1);
(
Self::run_window_request(
recycler,
from,
&from_addr,
blockstore,
&me.read().unwrap().my_info,
*slot,
*shred_index,
),
"WindowIndex",
)
}
RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
inc_new_counter_debug!("serve_repair-request-highest-window-index", 1);
(
Self::run_highest_window_request(
recycler,
&from_addr,
blockstore,
*slot,
*highest_index,
),
"HighestWindowIndex",
)
}
RepairProtocol::Orphan(_, slot) => {
inc_new_counter_debug!("serve_repair-request-orphan", 1);
(
Self::run_orphan(
recycler,
&from_addr,
blockstore,
*slot,
MAX_ORPHAN_REPAIR_RESPONSES,
),
"Orphan",
)
}
}
};
trace!("{}: received repair request: {:?}", my_id, request);
Self::report_time_spent(label, &now.elapsed(), "");
res
}
fn report_time_spent(label: &str, time: &Duration, extra: &str) {
let count = duration_as_ms(time);
if count > 5 {
info!("{} took: {} ms {}", label, count, extra);
}
}
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
blockstore: Option<&Arc<Blockstore>>,
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
Ok(())
}
pub fn listen(
me: Arc<RwLock<Self>>,
blockstore: Option<Arc<Blockstore>>,
requests_receiver: PacketReceiver,
response_sender: PacketSender,
exit: &Arc<AtomicBool>,
) -> JoinHandle<()> {
let exit = exit.clone();
let recycler = PacketsRecycler::default();
Builder::new()
.name("solana-repair-listen".to_string())
.spawn(move || loop {
let e = Self::run_listen(
&me,
&recycler,
blockstore.as_ref(),
&requests_receiver,
&response_sender,
);
if exit.load(Ordering::Relaxed) {
return;
}
if e.is_err() {
info!("repair listener error: {:?}", e);
}
thread_mem_usage::datapoint("solana-repair-listen");
})
.unwrap()
}
fn handle_packets(
me: &Arc<RwLock<Self>>,
recycler: &PacketsRecycler,
blockstore: Option<&Arc<Blockstore>>,
packets: Packets,
response_sender: &PacketSender,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
packets.packets.iter().for_each(|packet| {
let start = allocated.get();
let from_addr = packet.meta.addr();
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
});
datapoint_debug!(
"solana-serve-repair-memory",
("serve_repair", (allocated.get() - start) as i64, i64),
);
});
}
fn window_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = RepairProtocol::WindowIndex(self.my_info.clone(), slot, shred_index);
let out = serialize(&req)?;
Ok(out)
}
fn window_highest_index_request_bytes(&self, slot: Slot, shred_index: u64) -> Result<Vec<u8>> {
let req = RepairProtocol::HighestWindowIndex(self.my_info.clone(), slot, shred_index);
let out = serialize(&req)?;
Ok(out)
}
fn orphan_bytes(&self, slot: Slot) -> Result<Vec<u8>> {
let req = RepairProtocol::Orphan(self.my_info.clone(), slot);
let out = serialize(&req)?;
Ok(out)
}
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let valid: Vec<_> = self
.cluster_info
.read()
.unwrap()
.repair_peers(repair_request.slot());
if valid.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(repair_request)?;
Ok((addr, out))
}
pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
datapoint_debug!(
"serve_repair-repair",
("repair-slot", *slot, i64),
("repair-ix", *shred_index, i64)
);
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
}
RepairType::HighestShred(slot, shred_index) => {
datapoint_debug!(
"serve_repair-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *shred_index, i64)
);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
}
RepairType::Orphan(slot) => {
datapoint_debug!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64));
Ok(self.orphan_bytes(*slot)?)
}
}
}
fn run_window_request(
recycler: &PacketsRecycler,
from: &ContactInfo,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
me: &ContactInfo,
slot: Slot,
shred_index: u64,
) -> Option<Packets> {
if let Some(blockstore) = blockstore {
// Try to find the requested index in one of the slots
let packet = Self::get_data_shred_as_packet(blockstore, slot, shred_index, from_addr);
if let Ok(Some(packet)) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
));
}
}
inc_new_counter_debug!("serve_repair-window-request-fail", 1);
trace!(
"{}: failed WindowIndex {} {} {}",
me.id,
from.id,
slot,
shred_index,
);
None
}
fn run_highest_window_request(
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
slot: Slot,
highest_index: u64,
) -> Option<Packets> {
let blockstore = blockstore?;
// Try to find the requested index in one of the slots
let meta = blockstore.meta(slot).ok()??;
if meta.received > highest_index {
// meta.received must be at least 1 by this point
let packet =
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr)
.ok()??;
return Some(Packets::new_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
));
}
None
}
fn run_orphan(
recycler: &PacketsRecycler,
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
mut slot: Slot,
max_responses: usize,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
if meta.received == 0 {
break;
}
let packet =
Self::get_data_shred_as_packet(blockstore, slot, meta.received - 1, from_addr);
if let Ok(Some(packet)) = packet {
res.packets.push(packet);
}
if meta.is_parent_set() && res.packets.len() <= max_responses {
slot = meta.parent_slot;
} else {
break;
}
}
}
if res.is_empty() {
return None;
}
Some(res)
}
fn get_data_shred_as_packet(
blockstore: &Arc<Blockstore>,
slot: Slot,
shred_index: u64,
dest: &SocketAddr,
) -> Result<Option<Packet>> {
let data = blockstore.get_data_shred(slot, shred_index)?;
Ok(data.map(|data| {
let mut packet = Packet::default();
packet.meta.size = data.len();
packet.meta.set_addr(dest);
packet.data.copy_from_slice(&data);
packet
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::result::Error;
use solana_ledger::get_tmp_ledger_path;
use solana_ledger::{
blockstore::make_many_slot_entries,
blockstore_processor::fill_blockstore_slot_with_ticks,
shred::{
max_ticks_per_n_shreds, CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader,
},
};
use solana_sdk::{hash::Hash, pubkey::Pubkey, timing::timestamp};
/// test run_window_requestwindow requests respond with the right shred, and do not overrun
#[test]
fn run_highest_window_request() {
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
0,
0,
);
assert!(rv.is_none());
let _ = fill_blockstore_slot_with_ticks(
&blockstore,
max_ticks_per_n_shreds(1) + 1,
2,
1,
Hash::default(),
);
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
2,
1,
);
let rv: Vec<Shred> = rv
.expect("packets")
.packets
.into_iter()
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.collect();
assert!(!rv.is_empty());
let index = blockstore.meta(2).unwrap().unwrap().received - 1;
assert_eq!(rv[0].index(), index as u32);
assert_eq!(rv[0].slot(), 2);
let rv = ServeRepair::run_highest_window_request(
&recycler,
&socketaddr_any!(),
Some(&blockstore),
2,
index + 1,
);
assert!(rv.is_none());
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
/// test window requests respond with the right shred, and do not overrun
#[test]
fn run_window_request() {
let recycler = PacketsRecycler::default();
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let me = ContactInfo::new(
&Pubkey::new_rand(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
socketaddr!("127.0.0.1:1240"),
socketaddr!("127.0.0.1:1241"),
socketaddr!("127.0.0.1:1242"),
socketaddr!("127.0.0.1:1243"),
0,
);
let rv = ServeRepair::run_window_request(
&recycler,
&me,
&socketaddr_any!(),
Some(&blockstore),
&me,
0,
0,
);
assert!(rv.is_none());
let mut common_header = ShredCommonHeader::default();
common_header.slot = 2;
common_header.index = 1;
let mut data_header = DataShredHeader::default();
data_header.parent_offset = 1;
let shred_info = Shred::new_empty_from_header(
common_header,
data_header,
CodingShredHeader::default(),
);
blockstore
.insert_shreds(vec![shred_info], None, false)
.expect("Expect successful ledger write");
let rv = ServeRepair::run_window_request(
&recycler,
&me,
&socketaddr_any!(),
Some(&blockstore),
&me,
2,
1,
);
assert!(!rv.is_none());
let rv: Vec<Shred> = rv
.expect("packets")
.packets
.into_iter()
.filter_map(|b| Shred::new_from_serialized_shred(b.data.to_vec()).ok())
.collect();
assert_eq!(rv[0].index(), 1);
assert_eq!(rv[0].slot(), 2);
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn window_index_request() {
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me)));
let serve_repair = ServeRepair::new(cluster_info.clone());
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0));
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243);
let nxt = ContactInfo::new(
&Pubkey::new_rand(),
socketaddr!([127, 0, 0, 1], 1234),
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
socketaddr!([127, 0, 0, 1], 1241),
socketaddr!([127, 0, 0, 1], 1242),
serve_repair_addr,
0,
);
cluster_info.write().unwrap().insert_info(nxt.clone());
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair);
let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243);
let nxt = ContactInfo::new(
&Pubkey::new_rand(),
socketaddr!([127, 0, 0, 1], 1234),
socketaddr!([127, 0, 0, 1], 1235),
socketaddr!([127, 0, 0, 1], 1236),
socketaddr!([127, 0, 0, 1], 1237),
socketaddr!([127, 0, 0, 1], 1238),
socketaddr!([127, 0, 0, 1], 1239),
socketaddr!([127, 0, 0, 1], 1240),
socketaddr!([127, 0, 0, 1], 1241),
socketaddr!([127, 0, 0, 1], 1242),
serve_repair_addr2,
0,
);
cluster_info.write().unwrap().insert_info(nxt);
let mut one = false;
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.unwrap();
if rv.0 == serve_repair_addr {
one = true;
}
if rv.0 == serve_repair_addr2 {
two = true;
}
}
assert!(one && two);
}
#[test]
fn run_orphan() {
solana_logger::setup();
let recycler = PacketsRecycler::default();
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 2, 0);
assert!(rv.is_none());
// Create slots 1, 2, 3 with 5 shreds apiece
let (shreds, _) = make_many_slot_entries(1, 3, 5);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot 4, so we don't know how to service this requeset
let rv =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 4, 5);
assert!(rv.is_none());
// For slot 3, we should return the highest shreds from slots 3, 2, 1 respectively
// for this request
let rv: Vec<_> =
ServeRepair::run_orphan(&recycler, &socketaddr_any!(), Some(&blockstore), 3, 5)
.expect("run_orphan packets")
.packets
.iter()
.map(|b| b.clone())
.collect();
let expected: Vec<_> = (1..=3)
.rev()
.map(|slot| {
let index = blockstore.meta(slot).unwrap().unwrap().received - 1;
ServeRepair::get_data_shred_as_packet(
&blockstore,
slot,
index,
&socketaddr_any!(),
)
.unwrap()
.unwrap()
})
.collect();
assert_eq!(rv, expected)
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
}

View File

@ -0,0 +1,57 @@
use crate::serve_repair::ServeRepair;
use crate::streamer;
use solana_ledger::blockstore::Blockstore;
use solana_perf::recycler::Recycler;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{self, JoinHandle};
pub struct ServeRepairService {
thread_hdls: Vec<JoinHandle<()>>,
}
impl ServeRepairService {
pub fn new(
serve_repair: &Arc<RwLock<ServeRepair>>,
blockstore: Option<Arc<Blockstore>>,
serve_repair_socket: UdpSocket,
exit: &Arc<AtomicBool>,
) -> Self {
let (request_sender, request_receiver) = channel();
let serve_repair_socket = Arc::new(serve_repair_socket);
trace!(
"ServeRepairService: id: {}, listening on: {:?}",
&serve_repair.read().unwrap().my_info().id,
serve_repair_socket.local_addr().unwrap()
);
let t_receiver = streamer::receiver(
serve_repair_socket.clone(),
&exit,
request_sender,
Recycler::default(),
"serve_repair_receiver",
);
let (response_sender, response_receiver) = channel();
let t_responder =
streamer::responder("serve-repairs", serve_repair_socket, response_receiver);
let t_listen = ServeRepair::listen(
serve_repair.clone(),
blockstore,
request_receiver,
response_sender,
exit,
);
let thread_hdls = vec![t_receiver, t_responder, t_listen];
Self { thread_hdls }
}
pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -12,6 +12,8 @@ use crate::{
rpc_pubsub_service::PubSubService,
rpc_service::JsonRpcService,
rpc_subscriptions::RpcSubscriptions,
serve_repair::ServeRepair,
serve_repair_service::ServeRepairService,
sigverify,
storage_stage::StorageState,
tpu::Tpu,
@ -121,6 +123,7 @@ pub struct Validator {
rpc_service: Option<(JsonRpcService, PubSubService)>,
transaction_status_service: Option<TransactionStatusService>,
gossip_service: GossipService,
serve_repair_service: ServeRepairService,
poh_recorder: Arc<Mutex<PohRecorder>>,
poh_service: PohService,
tpu: Tpu,
@ -302,12 +305,19 @@ impl Validator {
let gossip_service = GossipService::new(
&cluster_info,
Some(blockstore.clone()),
Some(bank_forks.clone()),
node.sockets.gossip,
&exit,
);
let serve_repair = Arc::new(RwLock::new(ServeRepair::new(cluster_info.clone())));
let serve_repair_service = ServeRepairService::new(
&serve_repair,
Some(blockstore.clone()),
node.sockets.serve_repair,
&exit,
);
// Insert the entrypoint info, should only be None if this node
// is the bootstrap validator
if let Some(entrypoint_info) = entrypoint_info_option {
@ -403,6 +413,7 @@ impl Validator {
Self {
id,
gossip_service,
serve_repair_service,
rpc_service,
transaction_status_service,
tpu,
@ -463,6 +474,7 @@ impl Validator {
}
self.gossip_service.join()?;
self.serve_repair_service.join()?;
self.tpu.join()?;
self.tvu.join()?;
self.ip_echo_server.shutdown_now();

View File

@ -21,8 +21,7 @@ fn test_node(exit: &Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService
test_node.info.clone(),
keypair,
)));
let gossip_service =
GossipService::new(&cluster_info, None, None, test_node.sockets.gossip, exit);
let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit);
let _ = cluster_info.read().unwrap().my_data();
(
cluster_info,

View File

@ -6,6 +6,7 @@ use solana_core::{
cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE},
contact_info::ContactInfo,
gossip_service::discover_cluster,
serve_repair::ServeRepair,
storage_stage::SLOTS_PER_TURN_TEST,
validator::ValidatorConfig,
};
@ -61,10 +62,11 @@ fn run_archiver_startup_basic(num_nodes: usize, num_archivers: usize) {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
cluster_nodes[0].clone(),
)));
let serve_repair = ServeRepair::new(cluster_info);
let path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&path).unwrap());
Archiver::download_from_archiver(
&cluster_info,
&serve_repair,
&archiver_info,
&blockstore,
slots_per_segment,

View File

@ -6903,7 +6903,7 @@
"renderer": "flot",
"seriesOverrides": [
{
"alias": "cluster_info-repair_highest.ix",
"alias": "serve_repair-repair_highest.ix",
"yaxis": 2
}
],
@ -6928,7 +6928,7 @@
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",
@ -6965,7 +6965,7 @@
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_highest\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "A",
"resultFormat": "time_series",
@ -7064,7 +7064,7 @@
"renderer": "flot",
"seriesOverrides": [
{
"alias": "cluster_info-repair.repair-ix",
"alias": "serve_repair-repair.repair-ix",
"yaxis": 2
}
],
@ -7089,7 +7089,7 @@
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT last(\"repair-ix\") AS \"repair-ix\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-ix\") AS \"repair-ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",
@ -7126,7 +7126,7 @@
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT last(\"repair-slot\") AS \"repair-slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-slot\") AS \"repair-slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "A",
"resultFormat": "time_series",
@ -7245,7 +7245,7 @@
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair_orphan\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",

View File

@ -213,7 +213,6 @@ fn get_rpc_addr(
let gossip_service = GossipService::new(
&cluster_info.clone(),
None,
None,
node.sockets.gossip.try_clone().unwrap(),
&gossip_exit_flag,
);