Fix repair (#3581)

Add DetachedHeads repair protocol

Add DetachedHeads repair test

Repair starting from root
This commit is contained in:
carllin 2019-04-06 19:41:22 -07:00 committed by GitHub
parent 03da63b41b
commit 20aa4434e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 343 additions and 319 deletions

View File

@ -28,7 +28,7 @@ use std::fs;
use std::io;
use std::rc::Rc;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
mod db;
@ -105,6 +105,10 @@ impl SlotMeta {
self.consumed == self.last_index + 1
}
pub fn is_parent_set(&self) -> bool {
self.parent_slot != std::u64::MAX
}
fn new(slot: u64, parent_slot: u64) -> Self {
SlotMeta {
slot,
@ -124,8 +128,9 @@ pub struct Blocktree {
meta_cf: LedgerColumn<cf::SlotMeta>,
data_cf: LedgerColumn<cf::Data>,
erasure_cf: LedgerColumn<cf::Coding>,
detached_heads_cf: LedgerColumn<cf::DetachedHeads>,
orphans_cf: LedgerColumn<cf::Orphans>,
pub new_blobs_signals: Vec<SyncSender<bool>>,
pub root_slot: RwLock<u64>,
}
// Column family for metadata about a leader slot
@ -134,8 +139,8 @@ pub const META_CF: &str = "meta";
pub const DATA_CF: &str = "data";
// Column family for erasure data
pub const ERASURE_CF: &str = "erasure";
// Column family for detached heads data
pub const DETACHED_HEADS_CF: &str = "detached_heads";
// Column family for orphans data
pub const ORPHANS_CF: &str = "orphans";
impl Blocktree {
/// Opens a Ledger in directory, provides "infinite" window of blobs
@ -157,16 +162,19 @@ impl Blocktree {
// Create the erasure column family
let erasure_cf = LedgerColumn::new(&db);
// Create the detached heads column family
let detached_heads_cf = LedgerColumn::new(&db);
// Create the orphans column family. An "orphan" is defined as
// the head of a detached chain of slots, i.e. a slot with no
// known parent
let orphans_cf = LedgerColumn::new(&db);
Ok(Blocktree {
db,
meta_cf,
data_cf,
erasure_cf,
detached_heads_cf,
orphans_cf,
new_blobs_signals: vec![],
root_slot: RwLock::new(0),
})
}
@ -189,8 +197,8 @@ impl Blocktree {
self.meta_cf.get(slot)
}
pub fn detached_head(&self, slot: u64) -> Result<Option<bool>> {
self.detached_heads_cf.get(slot)
pub fn orphan(&self, slot: u64) -> Result<Option<bool>> {
self.orphans_cf.get(slot)
}
pub fn reset_slot_consumed(&self, slot: u64) -> Result<()> {
@ -322,11 +330,11 @@ impl Blocktree {
.expect("Expect database get to succeed")
{
let backup = Some(meta.clone());
// If parent_slot == std::u64::MAX, then this is one of the detached heads inserted
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
// during the chaining process, see the function find_slot_meta_in_cached_state()
// for details. Slots that are detached heads are missing a parent_slot, so we should
// for details. Slots that are orphans are missing a parent_slot, so we should
// fill in the parent now that we know it.
if Self::is_detached_head(&meta) {
if Self::is_orphan(&meta) {
meta.parent_slot = parent_slot;
}
@ -691,6 +699,26 @@ impl Blocktree {
Ok(entries)
}
pub fn set_root(&self, root: u64) {
*self.root_slot.write().unwrap() = root;
}
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
let mut results = vec![];
let mut iter = self.orphans_cf.cursor().unwrap();
iter.seek_to_first();
while iter.valid() {
if let Some(max) = max {
if results.len() > max {
break;
}
}
results.push(iter.key().unwrap());
iter.next();
}
results
}
fn deserialize_blobs<I>(blob_datas: &[I]) -> Vec<Entry>
where
I: Borrow<[u8]>,
@ -752,8 +780,7 @@ impl Blocktree {
.expect("Slot must exist in the working_set hashmap");
{
let is_detached_head =
meta_backup.is_some() && Self::is_detached_head(meta_backup.as_ref().unwrap());
let is_orphan = meta_backup.is_some() && Self::is_orphan(meta_backup.as_ref().unwrap());
let mut meta_mut = meta.borrow_mut();
@ -764,10 +791,10 @@ impl Blocktree {
if slot != 0 {
let prev_slot = meta_mut.parent_slot;
// Check if the slot represented by meta_mut is either a new slot or a detached head.
// Check if the slot represented by meta_mut is either a new slot or a orphan.
// In both cases we need to run the chaining logic b/c the parent on the slot was
// previously unknown.
if meta_backup.is_none() || is_detached_head {
if meta_backup.is_none() || is_orphan {
let prev_slot_meta =
self.find_slot_meta_else_create(working_set, new_chained_slots, prev_slot)?;
@ -778,15 +805,15 @@ impl Blocktree {
&mut meta_mut,
);
if Self::is_detached_head(&RefCell::borrow(&*prev_slot_meta)) {
write_batch.put::<cf::DetachedHeads>(prev_slot, &true)?;
if Self::is_orphan(&RefCell::borrow(&*prev_slot_meta)) {
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
}
}
}
// At this point this slot has received a parent, so no longer a detached head
if is_detached_head {
write_batch.delete::<cf::DetachedHeads>(slot)?;
// At this point this slot has received a parent, so no longer a orphan
if is_orphan {
write_batch.delete::<cf::Orphans>(slot)?;
}
}
@ -844,10 +871,10 @@ impl Blocktree {
Ok(())
}
fn is_detached_head(meta: &SlotMeta) -> bool {
// If we have children, but no parent, then this is the head of a detached chain of
fn is_orphan(meta: &SlotMeta) -> bool {
// If we have no parent, then this is the head of a detached chain of
// slots
meta.parent_slot == std::u64::MAX
!meta.is_parent_set()
}
// 1) Chain current_slot to the previous slot defined by prev_slot_meta
@ -865,14 +892,14 @@ impl Blocktree {
fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotMeta>) -> bool {
slot_meta.is_full()
&& (backup_slot_meta.is_none()
|| Self::is_detached_head(&backup_slot_meta.as_ref().unwrap())
|| Self::is_orphan(&backup_slot_meta.as_ref().unwrap())
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
}
// 1) Find the slot metadata in the cache of dirty slot metadata we've previously touched,
// else:
// 2) Search the database for that slot metadata. If still no luck, then:
// 3) Create a dummy `detached head` slot in the database
// 3) Create a dummy orphan slot in the database
fn find_slot_meta_else_create<'a>(
&self,
working_set: &'a HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
@ -888,7 +915,7 @@ impl Blocktree {
}
// Search the database for that slot metadata. If still no luck, then
// create a dummy `detached head` slot in the database
// create a dummy orphan slot in the database
fn find_slot_meta_in_db_else_create<'a>(
&self,
slot: u64,
@ -898,7 +925,7 @@ impl Blocktree {
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
Ok(insert_map.get(&slot).unwrap().clone())
} else {
// If this slot doesn't exist, make a `detached head` slot. This way we
// If this slot doesn't exist, make a orphan slot. This way we
// remember which slots chained to this one when we eventually get a real blob
// for this slot
insert_map.insert(
@ -1939,9 +1966,9 @@ pub mod tests {
for i in 0..num_slots {
// If "i" is the index of a slot we just inserted, then next_slots should be empty
// for slot "i" because no slots chain to that slot, because slot i + 1 is missing.
// However, if it's a slot we haven't inserted, aka one of the gaps, then one of the slots
// we just inserted will chain to that gap, so next_slots for that `detached head`
// slot won't be empty, but the parent slot is unknown so should equal std::u64::MAX.
// However, if it's a slot we haven't inserted, aka one of the gaps, then one of the
// slots we just inserted will chain to that gap, so next_slots for that orphan slot
// won't be empty, but the parent slot is unknown so should equal std::u64::MAX.
let s = blocktree.meta(i as u64).unwrap().unwrap();
if i % 2 == 0 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
@ -2142,8 +2169,8 @@ pub mod tests {
assert_eq!(expected_children, result);
}
// Detached heads is empty
assert!(blocktree.detached_heads_cf.is_empty().unwrap())
// No orphan slots should exist
assert!(blocktree.orphans_cf.is_empty().unwrap())
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -2187,8 +2214,8 @@ pub mod tests {
}
#[test]
fn test_detached_head() {
let blocktree_path = get_tmp_ledger_path("test_is_detached_head");
fn test_orphans() {
let blocktree_path = get_tmp_ledger_path("test_orphans");
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
@ -2197,48 +2224,48 @@ pub mod tests {
let (blobs, _) = make_many_slot_entries(0, 3, entries_per_slot);
// Write slot 2, which chains to slot 1. We're missing slot 0,
// so slot 1 is the detached head
// so slot 1 is the orphan
blocktree.write_blobs(once(&blobs[2])).unwrap();
let meta = blocktree
.meta(1)
.expect("Expect database get to succeed")
.unwrap();
assert!(Blocktree::is_detached_head(&meta));
assert_eq!(get_detached_heads(&blocktree), vec![1]);
assert!(Blocktree::is_orphan(&meta));
assert_eq!(blocktree.get_orphans(None), vec![1]);
// Write slot 1 which chains to slot 0, so now slot 0 is the
// detached head, and slot 1 is no longer the detached head.
// orphan, and slot 1 is no longer the orphan.
blocktree.write_blobs(once(&blobs[1])).unwrap();
let meta = blocktree
.meta(1)
.expect("Expect database get to succeed")
.unwrap();
assert!(!Blocktree::is_detached_head(&meta));
assert!(!Blocktree::is_orphan(&meta));
let meta = blocktree
.meta(0)
.expect("Expect database get to succeed")
.unwrap();
assert!(Blocktree::is_detached_head(&meta));
assert_eq!(get_detached_heads(&blocktree), vec![0]);
assert!(Blocktree::is_orphan(&meta));
assert_eq!(blocktree.get_orphans(None), vec![0]);
// Write some slot that also chains to existing slots and detached head,
// Write some slot that also chains to existing slots and orphan,
// nothing should change
let blob4 = &make_slot_entries(4, 0, 1).0[0];
let blob5 = &make_slot_entries(5, 1, 1).0[0];
blocktree.write_blobs(vec![blob4, blob5]).unwrap();
assert_eq!(get_detached_heads(&blocktree), vec![0]);
assert_eq!(blocktree.get_orphans(None), vec![0]);
// Write zeroth slot, no more detached heads
// Write zeroth slot, no more orphans
blocktree.write_blobs(once(&blobs[0])).unwrap();
for i in 0..3 {
let meta = blocktree
.meta(i)
.expect("Expect database get to succeed")
.unwrap();
assert!(!Blocktree::is_detached_head(&meta));
assert!(!Blocktree::is_orphan(&meta));
}
// Detached heads is empty
assert!(blocktree.detached_heads_cf.is_empty().unwrap())
// Orphans cf is empty
assert!(blocktree.orphans_cf.is_empty().unwrap())
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
@ -2502,15 +2529,4 @@ pub mod tests {
(blobs, entries)
}
fn get_detached_heads(blocktree: &Blocktree) -> Vec<u64> {
let mut results = vec![];
let mut iter = blocktree.detached_heads_cf.cursor().unwrap();
iter.seek_to_first();
while iter.valid() {
results.push(iter.key().unwrap());
iter.next();
}
results
}
}

View File

@ -17,8 +17,8 @@ pub mod columns {
pub struct SlotMeta;
#[derive(Debug)]
/// DetachedHeads Column
pub struct DetachedHeads;
/// Orphans Column
pub struct Orphans;
#[derive(Debug)]
/// Erasure Column

View File

@ -100,8 +100,8 @@ impl Column<Kvs> for cf::Data {
}
}
impl Column<Kvs> for cf::DetachedHeads {
const NAME: &'static str = super::DETACHED_HEADS_CF;
impl Column<Kvs> for cf::Orphans {
const NAME: &'static str = super::ORPHANS_CF;
type Index = u64;
fn key(slot: u64) -> Key {
@ -115,7 +115,7 @@ impl Column<Kvs> for cf::DetachedHeads {
}
}
impl TypedColumn<Kvs> for cf::DetachedHeads {
impl TypedColumn<Kvs> for cf::Orphans {
type Type = bool;
}

View File

@ -30,7 +30,7 @@ impl Backend for Rocks {
type Error = rocksdb::Error;
fn open(path: &Path) -> Result<Rocks> {
use crate::blocktree::db::columns::{Coding, Data, DetachedHeads, SlotMeta};
use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
fs::create_dir_all(&path)?;
@ -41,14 +41,13 @@ impl Backend for Rocks {
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
let detached_heads_descriptor =
ColumnFamilyDescriptor::new(DetachedHeads::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let cfs = vec![
meta_cf_descriptor,
data_cf_descriptor,
erasure_cf_descriptor,
detached_heads_descriptor,
orphans_cf_descriptor,
];
// Open the database
@ -58,14 +57,9 @@ impl Backend for Rocks {
}
fn columns(&self) -> Vec<&'static str> {
use crate::blocktree::db::columns::{Coding, Data, DetachedHeads, SlotMeta};
use crate::blocktree::db::columns::{Coding, Data, Orphans, SlotMeta};
vec![
Coding::NAME,
Data::NAME,
DetachedHeads::NAME,
SlotMeta::NAME,
]
vec![Coding::NAME, Data::NAME, Orphans::NAME, SlotMeta::NAME]
}
fn destroy(path: &Path) -> Result<()> {
@ -148,8 +142,8 @@ impl Column<Rocks> for cf::Data {
}
}
impl Column<Rocks> for cf::DetachedHeads {
const NAME: &'static str = super::DETACHED_HEADS_CF;
impl Column<Rocks> for cf::Orphans {
const NAME: &'static str = super::ORPHANS_CF;
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
@ -163,7 +157,7 @@ impl Column<Rocks> for cf::DetachedHeads {
}
}
impl TypedColumn<Rocks> for cf::DetachedHeads {
impl TypedColumn<Rocks> for cf::Orphans {
type Type = bool;
}

View File

@ -20,6 +20,7 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_value::{CrdsValue, CrdsValueLabel, Vote};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::repair_service::RepairType;
use crate::result::Result;
use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
@ -58,6 +59,9 @@ pub const GROW_LAYER_CAPACITY: bool = false;
/// milliseconds we sleep for between gossip requests
pub const GOSSIP_SLEEP_MILLIS: u64 = 100;
/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
#[derive(Debug, PartialEq, Eq)]
pub enum ClusterInfoError {
NoPeers,
@ -161,6 +165,7 @@ enum Protocol {
/// TODO: move this message to a different module
RequestWindowIndex(ContactInfo, u64, u64),
RequestHighestWindowIndex(ContactInfo, u64, u64),
RequestOrphan(ContactInfo, u64),
}
impl ClusterInfo {
@ -308,6 +313,7 @@ impl ClusterInfo {
.collect();
let max_ts = votes.iter().map(|x| x.0).max().unwrap_or(since);
let txs: Vec<Transaction> = votes.into_iter().map(|x| x.1).collect();
let votes: Vec<u64> = txs.iter().map(|tx| )
(txs, max_ts)
}
@ -746,12 +752,13 @@ impl ClusterInfo {
Ok(out)
}
pub fn window_index_request(
&self,
slot: u64,
blob_index: u64,
get_highest: bool,
) -> Result<(SocketAddr, Vec<u8>)> {
fn orphan_bytes(&self, slot: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestOrphan(self.my_data().clone(), slot);
let out = serialize(&req)?;
Ok(out)
}
pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication, as indicated
// by a valid tvu port location
let valid: Vec<_> = self.repair_peers();
@ -761,19 +768,39 @@ impl ClusterInfo {
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].gossip; // send the request to the peer's gossip port
let out = {
if get_highest {
self.window_highest_index_request_bytes(slot, blob_index)?
} else {
self.window_index_request_bytes(slot, blob_index)?
match repair_request {
RepairType::Blob(slot, blob_index) => {
submit(
influxdb::Point::new("cluster_info-repair")
.add_field("repair-slot", influxdb::Value::Integer(*slot as i64))
.add_field("repair-ix", influxdb::Value::Integer(*blob_index as i64))
.to_owned(),
);
self.window_index_request_bytes(*slot, *blob_index)?
}
RepairType::HighestBlob(slot, blob_index) => {
submit(
influxdb::Point::new("cluster_info-repair_highest")
.add_field(
"repair-highest-slot",
influxdb::Value::Integer(*slot as i64),
)
.add_field("repair-highest-ix", influxdb::Value::Integer(*slot as i64))
.to_owned(),
);
self.window_highest_index_request_bytes(*slot, *blob_index)?
}
RepairType::Orphan(slot) => {
submit(
influxdb::Point::new("cluster_info-repair_orphan")
.add_field("repair-orphan", influxdb::Value::Integer(*slot as i64))
.to_owned(),
);
self.orphan_bytes(*slot)?
}
}
};
submit(
influxdb::Point::new("cluster-info")
.add_field("repair-ix", influxdb::Value::Integer(blob_index as i64))
.to_owned(),
);
Ok((addr, out))
}
// If the network entrypoint hasn't been discovered yet, add it to the crds table
@ -966,6 +993,35 @@ impl ClusterInfo {
vec![]
}
fn run_orphan(
from_addr: &SocketAddr,
blocktree: Option<&Arc<Blocktree>>,
mut slot: u64,
max_responses: usize,
) -> Vec<SharedBlob> {
let mut res = vec![];
if let Some(blocktree) = blocktree {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blocktree.meta(slot) {
if meta.received == 0 {
break;
}
let blob = blocktree.get_data_blob(slot, meta.received - 1);
if let Ok(Some(mut blob)) = blob {
blob.meta.set_addr(from_addr);
res.push(Arc::new(RwLock::new(blob)));
}
if meta.is_parent_set() && res.len() <= max_responses {
slot = meta.parent_slot;
} else {
break;
}
}
}
res
}
//TODO we should first coalesce all the requests
fn handle_blob(
obj: &Arc<RwLock<Self>>,
@ -1082,14 +1138,21 @@ impl ClusterInfo {
vec![]
}
}
fn handle_request_window_index(
fn get_repair_sender(request: &Protocol) -> &ContactInfo {
match request {
Protocol::RequestWindowIndex(ref from, _, _) => from,
Protocol::RequestHighestWindowIndex(ref from, _, _) => from,
Protocol::RequestOrphan(ref from, _) => from,
_ => panic!("Not a repair request"),
}
}
fn handle_repair(
me: &Arc<RwLock<Self>>,
from: &ContactInfo,
blocktree: Option<&Arc<Blocktree>>,
slot: u64,
blob_index: u64,
from_addr: &SocketAddr,
is_get_highest: bool,
blocktree: Option<&Arc<Blocktree>>,
request: Protocol,
) -> Vec<SharedBlob> {
let now = Instant::now();
@ -1098,12 +1161,13 @@ impl ClusterInfo {
//TODO verify from is signed
let self_id = me.read().unwrap().gossip.id;
let from = Self::get_repair_sender(&request);
if from.id == me.read().unwrap().gossip.id {
warn!(
"{}: Ignored received RequestWindowIndex from ME {} {} {} ",
self_id, from.id, slot, blob_index,
"{}: Ignored received repair request from ME {}",
self_id, from.id,
);
inc_new_counter_info!("cluster_info-window-request-address-eq", 1);
inc_new_counter_info!("cluster_info-handle-repair--eq", 1);
return vec![];
}
@ -1113,26 +1177,49 @@ impl ClusterInfo {
.crds
.update_record_timestamp(&from.id, timestamp());
let my_info = me.read().unwrap().my_data().clone();
inc_new_counter_info!("cluster_info-window-request-recv", 1);
trace!(
"{}: received RequestWindowIndex from: {} slot: {}, blob_index: {}",
self_id,
from.id,
slot,
blob_index,
);
let res = {
if is_get_highest {
Self::run_highest_window_request(&from_addr, blocktree, slot, blob_index)
} else {
Self::run_window_request(&from, &from_addr, blocktree, &my_info, slot, blob_index)
let (res, label) = {
match &request {
Protocol::RequestWindowIndex(from, slot, blob_index) => {
inc_new_counter_info!("cluster_info-request-window-index", 1);
(
Self::run_window_request(
from,
&from_addr,
blocktree,
&my_info,
*slot,
*blob_index,
),
"RequestWindowIndex",
)
}
Protocol::RequestHighestWindowIndex(_, slot, highest_index) => {
inc_new_counter_info!("cluster_info-request-highest-window-index", 1);
(
Self::run_highest_window_request(
&from_addr,
blocktree,
*slot,
*highest_index,
),
"RequestHighestWindowIndex",
)
}
Protocol::RequestOrphan(_, slot) => {
inc_new_counter_info!("cluster_info-request-orphan", 1);
(
Self::run_orphan(&from_addr, blocktree, *slot, MAX_ORPHAN_REPAIR_RESPONSES),
"RequestOrphan",
)
}
_ => panic!("Not a repair request"),
}
};
report_time_spent(
"RequestWindowIndex",
&now.elapsed(),
&format!("slot {}, blob_index: {}", slot, blob_index),
);
trace!("{}: received repair request: {:?}", self_id, request);
report_time_spent(label, &now.elapsed(), "");
res
}
@ -1198,22 +1285,7 @@ impl ClusterInfo {
}
vec![]
}
Protocol::RequestWindowIndex(from, slot, blob_index) => {
Self::handle_request_window_index(
me, &from, blocktree, slot, blob_index, from_addr, false,
)
}
Protocol::RequestHighestWindowIndex(from, slot, highest_index) => {
Self::handle_request_window_index(
me,
&from,
blocktree,
slot,
highest_index,
from_addr,
true,
)
}
_ => Self::handle_repair(me, from_addr, blocktree, request),
}
}
@ -1522,9 +1594,11 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
mod tests {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_many_slot_entries;
use crate::blocktree::Blocktree;
use crate::crds_value::CrdsValueLabel;
use crate::packet::BLOB_HEADER_SIZE;
use crate::repair_service::RepairType;
use crate::result::Error;
use crate::test_tx::test_tx;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -1591,7 +1665,7 @@ mod tests {
fn window_index_request() {
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(me);
let rv = cluster_info.window_index_request(0, 0, false);
let rv = cluster_info.repair_request(&RepairType::Blob(0, 0));
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let gossip_addr = socketaddr!([127, 0, 0, 1], 1234);
@ -1607,7 +1681,9 @@ mod tests {
0,
);
cluster_info.insert_info(nxt.clone());
let rv = cluster_info.window_index_request(0, 0, false).unwrap();
let rv = cluster_info
.repair_request(&RepairType::Blob(0, 0))
.unwrap();
assert_eq!(nxt.gossip, gossip_addr);
assert_eq!(rv.0, nxt.gossip);
@ -1628,7 +1704,9 @@ mod tests {
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = cluster_info.window_index_request(0, 0, false).unwrap();
let rv = cluster_info
.repair_request(&RepairType::Blob(0, 0))
.unwrap();
if rv.0 == gossip_addr {
one = true;
}
@ -1746,6 +1824,42 @@ mod tests {
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn run_orphan() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap());
let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 2, 0);
assert!(rv.is_empty());
// Create slots 1, 2, 3 with 5 blobs apiece
let (blobs, _) = make_many_slot_entries(1, 3, 5);
blocktree
.write_blobs(&blobs)
.expect("Expect successful ledger write");
// We don't have slot 4, so we don't know how to service this requeset
let rv = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 4, 5);
assert!(rv.is_empty());
// For slot 3, we should return the highest blobs from slots 3, 2, 1 respectively
// for this request
let rv: Vec<_> = ClusterInfo::run_orphan(&socketaddr_any!(), Some(&blocktree), 3, 5)
.iter()
.map(|b| b.read().unwrap().clone())
.collect();
let expected: Vec<_> = (1..=3)
.rev()
.map(|slot| blocktree.get_data_blob(slot, 4).unwrap().unwrap())
.collect();
assert_eq!(rv, expected)
}
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_default_leader() {
solana_logger::setup();

View File

@ -254,6 +254,10 @@ impl Locktower {
}
}
pub fn root(&self) -> Option<u64> {
self.lockouts.root_slot
}
pub fn calculate_weight(&self, stake_lockouts: &HashMap<u64, StakeLockout>) -> u128 {
let mut sum = 0u128;
let root_slot = self.lockouts.root_slot.unwrap_or(0);

View File

@ -16,28 +16,16 @@ use std::time::Duration;
pub const MAX_REPAIR_LENGTH: usize = 16;
pub const REPAIR_MS: u64 = 100;
pub const MAX_REPAIR_TRIES: u64 = 128;
pub const NUM_FORKS_TO_REPAIR: usize = 5;
pub const MAX_ORPHANS: usize = 5;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)]
pub enum RepairType {
Orphan(u64),
HighestBlob(u64, u64),
Blob(u64, u64),
}
#[derive(Default)]
struct RepairInfo {
max_slot: u64,
repair_tries: u64,
}
impl RepairInfo {
fn new() -> Self {
RepairInfo {
max_slot: 0,
repair_tries: 0,
}
}
}
pub struct RepairSlotRange {
pub start: u64,
pub end: u64,
@ -62,50 +50,36 @@ impl RepairService {
exit: Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_slot_range: RepairSlotRange,
_repair_slot_range: RepairSlotRange,
) {
let mut repair_info = RepairInfo::new();
let id = cluster_info.read().unwrap().id();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let repairs = Self::generate_repairs(
blocktree,
MAX_REPAIR_LENGTH,
&mut repair_info,
&repair_slot_range,
);
let repairs = Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH);
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
let (slot, blob_index, is_highest_request) = {
match repair_request {
RepairType::Blob(s, i) => (s, i, false),
RepairType::HighestBlob(s, i) => (s, i, true),
}
};
cluster_info
.read()
.unwrap()
.window_index_request(slot, blob_index, is_highest_request)
.map(|result| (result, slot, blob_index))
.repair_request(&repair_request)
.map(|result| (result, repair_request))
.ok()
})
.collect();
for ((to, req), slot, blob_index) in reqs {
for ((to, req), repair_request) in reqs {
if let Ok(local_addr) = repair_socket.local_addr() {
submit(
influxdb::Point::new("repair_service")
.add_field("repair_slot", influxdb::Value::Integer(slot as i64))
.to_owned()
.add_field(
"repair_blob",
influxdb::Value::Integer(blob_index as i64),
"repair_request",
influxdb::Value::String(format!("{:?}", repair_request)),
)
.to_owned()
.add_field("to", influxdb::Value::String(to.to_string()))
@ -151,7 +125,7 @@ impl RepairService {
RepairService { t_repair }
}
fn process_slot(
fn generate_repairs_for_slot(
blocktree: &Blocktree,
slot: u64,
slot_meta: &SlotMeta,
@ -175,49 +149,49 @@ impl RepairService {
}
}
fn generate_repairs(
blocktree: &Blocktree,
max_repairs: usize,
repair_info: &mut RepairInfo,
repair_range: &RepairSlotRange,
) -> Result<(Vec<RepairType>)> {
fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<RepairType> = vec![];
let mut current_slot = Some(repair_range.start);
while repairs.len() < max_repairs && current_slot.is_some() {
if current_slot.unwrap() > repair_range.end {
break;
}
let slot = *blocktree.root_slot.read().unwrap();
Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot);
if current_slot.unwrap() > repair_info.max_slot {
repair_info.repair_tries = 0;
repair_info.max_slot = current_slot.unwrap();
}
// TODO: Incorporate gossip to determine priorities for repair?
if let Some(slot) = blocktree.meta(current_slot.unwrap())? {
let new_repairs = Self::process_slot(
// Try to resolve orphans in blocktree
let orphans = blocktree.get_orphans(Some(MAX_ORPHANS));
Self::generate_repairs_for_orphans(&orphans[..], &mut repairs);
Ok(repairs)
}
fn generate_repairs_for_orphans(orphans: &[u64], repairs: &mut Vec<RepairType>) {
repairs.extend(orphans.iter().map(|h| RepairType::Orphan(*h)));
}
/// Repairs any fork starting at the input slot
fn generate_repairs_for_fork(
blocktree: &Blocktree,
repairs: &mut Vec<RepairType>,
max_repairs: usize,
slot: u64,
) {
let mut pending_slots = vec![slot];
while repairs.len() < max_repairs && !pending_slots.is_empty() {
let slot = pending_slots.pop().unwrap();
if let Some(slot_meta) = blocktree.meta(slot).unwrap() {
let new_repairs = Self::generate_repairs_for_slot(
blocktree,
current_slot.unwrap(),
&slot,
slot,
&slot_meta,
max_repairs - repairs.len(),
);
repairs.extend(new_repairs);
let next_slots = slot_meta.next_slots;
pending_slots.extend(next_slots);
} else {
break;
}
current_slot = blocktree.get_next_slot(current_slot.unwrap())?;
}
// Only increment repair_tries if the ledger contains every blob for every slot
if repairs.is_empty() {
repair_info.repair_tries += 1;
}
// Optimistically try the next slot if we haven't gotten any repairs
// for a while
if repair_info.repair_tries >= MAX_REPAIR_TRIES {
repairs.push(RepairType::HighestBlob(repair_info.max_slot + 1, 0))
}
Ok(repairs)
}
}
@ -234,67 +208,26 @@ mod test {
use super::*;
use crate::blocktree::tests::{make_many_slot_entries, make_slot_entries};
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::entry::create_ticks;
use crate::entry::{make_tiny_test_entries, EntrySlice};
use solana_sdk::hash::Hash;
#[test]
pub fn test_repair_missed_future_slot() {
pub fn test_repair_orphan() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let mut blobs = create_ticks(1, Hash::default()).to_blobs();
blobs[0].set_index(0);
blobs[0].set_slot(0);
blobs[0].set_is_last_in_slot();
blocktree.write_blobs(&blobs).unwrap();
let mut repair_info = RepairInfo::new();
let repair_slot_range = RepairSlotRange::default();
// We have all the blobs for all the slots in the ledger, wait for optimistic
// future repair after MAX_REPAIR_TRIES
for i in 0..MAX_REPAIR_TRIES {
// Check that repair tries to patch the empty slot
assert_eq!(repair_info.repair_tries, i);
assert_eq!(repair_info.max_slot, 0);
let expected = if i == MAX_REPAIR_TRIES - 1 {
vec![RepairType::HighestBlob(1, 0)]
} else {
vec![]
};
assert_eq!(
RepairService::generate_repairs(
&blocktree,
2,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
expected
);
}
// Insert a bigger blob, see that we the MAX_REPAIR_TRIES gets reset
let mut blobs = create_ticks(1, Hash::default()).to_blobs();
blobs[0].set_index(0);
blobs[0].set_slot(1);
blobs[0].set_is_last_in_slot();
// Create some orphan slots
let (mut blobs, _) = make_slot_entries(1, 0, 1);
let (blobs2, _) = make_slot_entries(5, 2, 1);
blobs.extend(blobs2);
blocktree.write_blobs(&blobs).unwrap();
assert_eq!(
RepairService::generate_repairs(
&blocktree,
2,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
vec![]
RepairService::generate_repairs(&blocktree, 2).unwrap(),
vec![
RepairType::HighestBlob(0, 0),
RepairType::Orphan(0),
RepairType::Orphan(2)
]
);
assert_eq!(repair_info.repair_tries, 1);
assert_eq!(repair_info.max_slot, 1);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -306,25 +239,16 @@ mod test {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let mut blobs = make_tiny_test_entries(1).to_blobs();
blobs[0].set_index(1);
blobs[0].set_slot(2);
let (blobs, _) = make_slot_entries(2, 0, 1);
let mut repair_info = RepairInfo::new();
// Write this blob to slot 2, should chain to slot 1, which we haven't received
// Write this blob to slot 2, should chain to slot 0, which we haven't received
// any blobs for
blocktree.write_blobs(&blobs).unwrap();
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(
&blocktree,
2,
&mut repair_info,
&RepairSlotRange::default()
)
.unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Blob(2, 0)]
RepairService::generate_repairs(&blocktree, 2).unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(0)]
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
@ -340,8 +264,6 @@ mod test {
let num_entries_per_slot = 5 * nth;
let num_slots = 2;
let mut repair_info = RepairInfo::new();
// Create some blobs
let (blobs, _) =
make_many_slot_entries(0, num_slots as u64, num_entries_per_slot as u64);
@ -363,28 +285,13 @@ mod test {
})
.collect();
// Across all slots, find all missing indexes in the range [0, num_entries_per_slot]
let repair_slot_range = RepairSlotRange::default();
assert_eq!(
RepairService::generate_repairs(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
RepairService::generate_repairs(&blocktree, std::usize::MAX).unwrap(),
expected
);
assert_eq!(
RepairService::generate_repairs(
&blocktree,
expected.len() - 2,
&mut repair_info,
&repair_slot_range
)
.unwrap()[..],
RepairService::generate_repairs(&blocktree, expected.len() - 2).unwrap()[..],
expected[0..expected.len() - 2]
);
}
@ -399,8 +306,6 @@ mod test {
let num_entries_per_slot = 10;
let mut repair_info = RepairInfo::new();
// Create some blobs
let (mut blobs, _) = make_slot_entries(0, 0, num_entries_per_slot as u64);
@ -412,23 +317,15 @@ mod test {
// We didn't get the last blob for this slot, so ask for the highest blob for that slot
let expected: Vec<RepairType> = vec![RepairType::HighestBlob(0, num_entries_per_slot)];
let repair_slot_range = RepairSlotRange::default();
assert_eq!(
RepairService::generate_repairs(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
RepairService::generate_repairs(&blocktree, std::usize::MAX).unwrap(),
expected
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
/*#[test]
pub fn test_repair_range() {
let blocktree_path = get_tmp_ledger_path!();
{
@ -456,17 +353,11 @@ mod test {
repair_slot_range.end = end;
assert_eq!(
RepairService::generate_repairs(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info,)
.unwrap(),
expected
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}*/
}

View File

@ -97,7 +97,9 @@ impl ReplayStage {
let vote_account = *vote_account;
let mut ticks_per_slot = 0;
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_id);
if let Some(root) = locktower.root() {
blocktree.set_root(root);
}
// Start the replay stage loop
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
@ -145,6 +147,7 @@ impl ReplayStage {
&voting_keypair,
&vote_account,
&cluster_info,
&blocktree,
);
Self::reset_poh_recorder(
@ -292,6 +295,7 @@ impl ReplayStage {
voting_keypair: &Option<Arc<T>>,
vote_account: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>,
) where
T: 'static + KeypairUtil + Send + Sync,
{
@ -304,6 +308,7 @@ impl ReplayStage {
);
if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root);
blocktree.set_root(new_root);
Self::handle_new_root(&bank_forks, progress);
}
locktower.update_epoch(&bank);

View File

@ -3924,10 +3924,10 @@
],
"type": "fill"
}
],
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT last(\"repair-ix\") AS \"repair\" FROM \"$testnet\".\"autogen\".\"cluster-info\" WHERE host_id =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-ix\") AS \"repair\" FROM \"$testnet\".\"autogen\".\"cluster_info-repair\" WHERE host_id =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "C",
"resultFormat": "time_series",