Add new gossip structure for supporting repairs (#4205)

* Add Epoch Slots to gossip

* Add new gossip structure to support Repair

* remove unnecessary clones

* Setup dummy fast repair in repair_service

* PR comments
This commit is contained in:
carllin 2019-05-08 13:50:32 -07:00 committed by GitHub
parent 965c1e0000
commit b8fd51e97d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 59 deletions

View File

@ -18,7 +18,7 @@ use crate::contact_info::ContactInfo;
use crate::crds_gossip::CrdsGossip;
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote};
use crate::crds_value::{CrdsValue, CrdsValueLabel, EpochSlots, Vote};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::repair_service::RepairType;
use crate::result::Result;
@ -41,6 +41,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
use solana_sdk::transaction::Transaction;
use std::cmp::min;
use std::collections::HashSet;
use std::fmt;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
@ -200,7 +201,7 @@ impl ClusterInfo {
let mut entry = CrdsValue::ContactInfo(my_data);
entry.sign(&self.keypair);
self.gossip.refresh_push_active_set(stakes);
self.gossip.process_push_message(&[entry], now);
self.gossip.process_push_message(vec![entry], now);
}
// TODO kill insert_info, only used by tests
@ -296,12 +297,19 @@ impl ClusterInfo {
self.gossip_leader_id = *leader_id;
}
pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: HashSet<u64>) {
let now = timestamp();
let mut entry = CrdsValue::EpochSlots(EpochSlots::new(id, root, slots, now));
entry.sign(&self.keypair);
self.gossip.process_push_message(vec![entry], now);
}
pub fn push_vote(&mut self, vote: Transaction) {
let now = timestamp();
let vote = Vote::new(&self.id(), vote, now);
let mut entry = CrdsValue::Vote(vote);
entry.sign(&self.keypair);
self.gossip.process_push_message(&[entry], now);
self.gossip.process_push_message(vec![entry], now);
}
/// Get votes in the crds
@ -1133,7 +1141,7 @@ impl ClusterInfo {
fn handle_push_message(
me: &Arc<RwLock<Self>>,
from: &Pubkey,
data: &[CrdsValue],
data: Vec<CrdsValue>,
) -> Vec<SharedBlob> {
let self_id = me.read().unwrap().gossip.id;
inc_new_counter_info!("cluster_info-push_message", 1, 0, 1000);
@ -1141,7 +1149,7 @@ impl ClusterInfo {
.write()
.unwrap()
.gossip
.process_push_message(&data, timestamp());
.process_push_message(data, timestamp());
if !prunes.is_empty() {
inc_new_counter_info!("cluster_info-push_message-prunes", prunes.len());
let ci = me.read().unwrap().lookup(from).cloned();
@ -1294,7 +1302,7 @@ impl ClusterInfo {
}
ret
});
Self::handle_push_message(me, &from, &data)
Self::handle_push_message(me, &from, data)
}
Protocol::PruneMessage(from, data) => {
if data.verify() {

View File

@ -40,20 +40,20 @@ impl CrdsGossip {
self.id = *id;
}
/// process a push message to the network
pub fn process_push_message(&mut self, values: &[CrdsValue], now: u64) -> Vec<Pubkey> {
pub fn process_push_message(&mut self, values: Vec<CrdsValue>, now: u64) -> Vec<Pubkey> {
let labels: Vec<_> = values.iter().map(CrdsValue::label).collect();
let results: Vec<_> = values
.iter()
.map(|val| {
self.push
.process_push_message(&mut self.crds, val.clone(), now)
})
.into_iter()
.map(|val| self.push.process_push_message(&mut self.crds, val, now))
.collect();
results
.into_iter()
.zip(values)
.zip(labels)
.filter_map(|(r, d)| {
if r == Err(CrdsGossipError::PushMessagePrune) {
Some(d.label().pubkey())
Some(d.pubkey())
} else if let Ok(Some(val)) = r {
self.pull
.record_old_hash(val.value_hash, val.local_timestamp);

View File

@ -3,6 +3,7 @@ use bincode::serialize;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, Signable, Signature};
use solana_sdk::transaction::Transaction;
use std::collections::HashSet;
use std::fmt;
/// CrdsValue that is replicated across the cluster
@ -12,6 +13,58 @@ pub enum CrdsValue {
ContactInfo(ContactInfo),
/// * Merge Strategy - Latest wallclock is picked
Vote(Vote),
/// * Merge Strategy - Latest wallclock is picked
EpochSlots(EpochSlots),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct EpochSlots {
pub from: Pubkey,
pub root: u64,
pub slots: HashSet<u64>,
pub signature: Signature,
pub wallclock: u64,
}
impl EpochSlots {
pub fn new(from: Pubkey, root: u64, slots: HashSet<u64>, wallclock: u64) -> Self {
Self {
from,
root,
slots,
signature: Signature::default(),
wallclock,
}
}
}
impl Signable for EpochSlots {
fn pubkey(&self) -> Pubkey {
self.from
}
fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData<'a> {
root: u64,
slots: &'a HashSet<u64>,
wallclock: u64,
}
let data = SignData {
root: self.root,
slots: &self.slots,
wallclock: self.wallclock,
};
serialize(&data).expect("unable to serialize EpochSlots")
}
fn get_signature(&self) -> Signature {
self.signature
}
fn set_signature(&mut self, signature: Signature) {
self.signature = signature;
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -22,6 +75,17 @@ pub struct Vote {
pub wallclock: u64,
}
impl Vote {
pub fn new(from: &Pubkey, transaction: Transaction, wallclock: u64) -> Self {
Self {
from: *from,
transaction,
signature: Signature::default(),
wallclock,
}
}
}
impl Signable for Vote {
fn pubkey(&self) -> Pubkey {
self.from
@ -29,12 +93,12 @@ impl Signable for Vote {
fn signable_data(&self) -> Vec<u8> {
#[derive(Serialize)]
struct SignData {
transaction: Transaction,
struct SignData<'a> {
transaction: &'a Transaction,
wallclock: u64,
}
let data = SignData {
transaction: self.transaction.clone(),
transaction: &self.transaction,
wallclock: self.wallclock,
};
serialize(&data).expect("unable to serialize Vote")
@ -55,6 +119,7 @@ impl Signable for Vote {
pub enum CrdsValueLabel {
ContactInfo(Pubkey),
Vote(Pubkey),
EpochSlots(Pubkey),
}
impl fmt::Display for CrdsValueLabel {
@ -62,6 +127,7 @@ impl fmt::Display for CrdsValueLabel {
match self {
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
CrdsValueLabel::Vote(_) => write!(f, "Vote({})", self.pubkey()),
CrdsValueLabel::EpochSlots(_) => write!(f, "EpochSlots({})", self.pubkey()),
}
}
}
@ -71,17 +137,7 @@ impl CrdsValueLabel {
match self {
CrdsValueLabel::ContactInfo(p) => *p,
CrdsValueLabel::Vote(p) => *p,
}
}
}
impl Vote {
pub fn new(from: &Pubkey, transaction: Transaction, wallclock: u64) -> Self {
Vote {
from: *from,
transaction,
signature: Signature::default(),
wallclock,
CrdsValueLabel::EpochSlots(p) => *p,
}
}
}
@ -94,6 +150,7 @@ impl CrdsValue {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.wallclock,
CrdsValue::Vote(vote) => vote.wallclock,
CrdsValue::EpochSlots(vote) => vote.wallclock,
}
}
pub fn label(&self) -> CrdsValueLabel {
@ -102,6 +159,7 @@ impl CrdsValue {
CrdsValueLabel::ContactInfo(contact_info.pubkey())
}
CrdsValue::Vote(vote) => CrdsValueLabel::Vote(vote.pubkey()),
CrdsValue::EpochSlots(slots) => CrdsValueLabel::EpochSlots(slots.pubkey()),
}
}
pub fn contact_info(&self) -> Option<&ContactInfo> {
@ -116,11 +174,18 @@ impl CrdsValue {
_ => None,
}
}
pub fn epoch_slots(&self) -> Option<&EpochSlots> {
match self {
CrdsValue::EpochSlots(slots) => Some(slots),
_ => None,
}
}
/// Return all the possible labels for a record identified by Pubkey.
pub fn record_labels(key: &Pubkey) -> [CrdsValueLabel; 2] {
pub fn record_labels(key: &Pubkey) -> [CrdsValueLabel; 3] {
[
CrdsValueLabel::ContactInfo(*key),
CrdsValueLabel::Vote(*key),
CrdsValueLabel::EpochSlots(*key),
]
}
}
@ -130,12 +195,15 @@ impl Signable for CrdsValue {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.sign(keypair),
CrdsValue::Vote(vote) => vote.sign(keypair),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.sign(keypair),
};
}
fn verify(&self) -> bool {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.verify(),
CrdsValue::Vote(vote) => vote.verify(),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.verify(),
}
}
@ -143,6 +211,7 @@ impl Signable for CrdsValue {
match self {
CrdsValue::ContactInfo(contact_info) => contact_info.pubkey(),
CrdsValue::Vote(vote) => vote.pubkey(),
CrdsValue::EpochSlots(epoch_slots) => epoch_slots.pubkey(),
}
}
@ -169,12 +238,13 @@ mod test {
#[test]
fn test_labels() {
let mut hits = [false; 2];
let mut hits = [false; 3];
// this method should cover all the possible labels
for v in &CrdsValue::record_labels(&Pubkey::default()) {
match v {
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
CrdsValueLabel::Vote(_) => hits[1] = true,
CrdsValueLabel::EpochSlots(_) => hits[2] = true,
}
}
assert!(hits.iter().all(|x| *x));
@ -190,6 +260,11 @@ mod test {
assert_eq!(v.wallclock(), 0);
let key = v.clone().vote().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::Vote(key));
let v = CrdsValue::EpochSlots(EpochSlots::new(Pubkey::default(), 0, HashSet::new(), 0));
assert_eq!(v.wallclock(), 0);
let key = v.clone().epoch_slots().unwrap().from;
assert_eq!(v.label(), CrdsValueLabel::EpochSlots(key));
}
#[test]
fn test_signature() {
@ -200,6 +275,13 @@ mod test {
verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::Vote(Vote::new(&keypair.pubkey(), test_tx(), timestamp()));
verify_signatures(&mut v, &keypair, &wrong_keypair);
v = CrdsValue::EpochSlots(EpochSlots::new(
keypair.pubkey(),
0,
HashSet::new(),
timestamp(),
));
verify_signatures(&mut v, &keypair, &wrong_keypair);
}
fn verify_signatures(

View File

@ -6,6 +6,8 @@ use crate::cluster_info::ClusterInfo;
use crate::result::Result;
use crate::service::Service;
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use std::collections::HashSet;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -60,6 +62,30 @@ pub struct RepairService {
}
impl RepairService {
pub fn new(
blocktree: Arc<Blocktree>,
exit: &Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
repair_slot_range: Option<RepairSlotRange>,
) -> Self {
let exit = exit.clone();
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || {
Self::run(
&blocktree,
exit,
&repair_socket,
&cluster_info,
repair_slot_range,
)
})
.unwrap();
RepairService { t_repair }
}
fn run(
blocktree: &Arc<Blocktree>,
exit: Arc<AtomicBool>,
@ -68,6 +94,7 @@ impl RepairService {
repair_slot_range: Option<RepairSlotRange>,
) {
let mut repair_info = RepairInfo::new();
let epoch_slots: HashSet<u64> = HashSet::new();
let id = cluster_info.read().unwrap().id();
loop {
if exit.load(Ordering::Relaxed) {
@ -84,6 +111,7 @@ impl RepairService {
repair_slot_range,
)
} else {
Self::update_fast_repair(id, &epoch_slots, &cluster_info);
Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH)
}
};
@ -129,30 +157,6 @@ impl RepairService {
}
}
pub fn new(
blocktree: Arc<Blocktree>,
exit: &Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
repair_slot_range: Option<RepairSlotRange>,
) -> Self {
let exit = exit.clone();
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || {
Self::run(
&blocktree,
exit,
&repair_socket,
&cluster_info,
repair_slot_range,
)
})
.unwrap();
RepairService { t_repair }
}
fn generate_repairs_in_range(
blocktree: &Blocktree,
max_repairs: usize,
@ -266,6 +270,14 @@ impl RepairService {
}
}
}
fn update_fast_repair(id: Pubkey, slots: &HashSet<u64>, cluster_info: &RwLock<ClusterInfo>) {
let root = 0;
cluster_info
.write()
.unwrap()
.push_epoch_slots(id, root, slots.clone());
}
}
impl Service for RepairService {

View File

@ -125,7 +125,7 @@ fn network_simulator(network: &mut Network) {
.and_then(|v| v.contact_info().cloned())
.unwrap();
m.wallclock = now;
node.process_push_message(&[CrdsValue::ContactInfo(m.clone())], now);
node.process_push_message(vec![CrdsValue::ContactInfo(m)], now);
});
// push for a bit
let (queue_size, bytes_tx) = network_run_push(network, start, end);
@ -170,18 +170,18 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
})
.collect();
let transfered: Vec<_> = requests
.par_iter()
.into_par_iter()
.map(|(from, peers, msgs)| {
let mut bytes: usize = 0;
let mut delivered: usize = 0;
let mut num_msgs: usize = 0;
let mut prunes: usize = 0;
for to in peers {
bytes += serialized_size(msgs).unwrap() as usize;
bytes += serialized_size(&msgs).unwrap() as usize;
num_msgs += 1;
let rsps = network
.get(&to)
.map(|node| node.lock().unwrap().process_push_message(&msgs, now))
.map(|node| node.lock().unwrap().process_push_message(msgs.clone(), now))
.unwrap();
bytes += serialized_size(&rsps).unwrap() as usize;
prunes += rsps.len();
@ -191,7 +191,7 @@ fn network_run_push(network: &mut Network, start: usize, end: usize) -> (usize,
let mut node = node.lock().unwrap();
let destination = node.id;
let now = timestamp();
node.process_prune_msg(&*to, &destination, &rsps, now, now)
node.process_prune_msg(&to, &destination, &rsps, now, now)
.unwrap()
})
.unwrap();