Support multiple forks in the ledger (#2277)

* Modify db_ledger to support per_slot metadata, add signal for updates, and add chaining to slots in db_ledger

* Modify replay stage to ask db_ledger for updates based on slots

* Add repair send/receive metrics

* Add repair service, remove old repair code

* Fix tmp_copy_ledger and setup for tests to account for multiple slots and tick limits within slots
This commit is contained in:
carllin 2019-02-07 15:10:54 -08:00 committed by GitHub
parent 5bb4ac9873
commit fd7db7a954
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 2066 additions and 819 deletions

2
Cargo.lock generated
View File

@ -1,3 +1,5 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "MacTypes-sys"
version = "1.3.0"

View File

@ -82,7 +82,7 @@ impl Broadcast {
.collect();
// TODO: blob_index should be slot-relative...
index_blobs(&blobs, &self.id, self.blob_index, &slots);
index_blobs(&blobs, &self.id, &mut self.blob_index, &slots);
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
@ -92,9 +92,6 @@ impl Broadcast {
blob_sender.send(blobs.clone())?;
// don't count coding blobs in the blob indexes
self.blob_index += blobs.len() as u64;
// Send out data
ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?;
@ -180,11 +177,12 @@ pub struct BroadcastService {
}
impl BroadcastService {
#[allow(clippy::too_many_arguments)]
fn run(
bank: &Arc<Bank>,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
entry_height: u64,
blob_index: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
receiver: &Receiver<Vec<Entry>>,
max_tick_height: u64,
@ -196,7 +194,7 @@ impl BroadcastService {
let mut broadcast = Broadcast {
id: me.id,
max_tick_height,
blob_index: entry_height,
blob_index,
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator::new(),
};
@ -246,11 +244,12 @@ impl BroadcastService {
/// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
/// completing the cycle.
#[allow(clippy::too_many_arguments)]
pub fn new(
bank: Arc<Bank>,
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
entry_height: u64,
blob_index: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
receiver: Receiver<Vec<Entry>>,
max_tick_height: u64,
@ -267,7 +266,7 @@ impl BroadcastService {
&bank,
&sock,
&cluster_info,
entry_height,
blob_index,
&leader_scheduler,
&receiver,
max_tick_height,
@ -315,7 +314,7 @@ mod test {
ledger_path: &str,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
entry_receiver: Receiver<Vec<Entry>>,
entry_height: u64,
blob_index: u64,
max_tick_height: u64,
) -> MockBroadcastService {
// Make the database ledger
@ -343,7 +342,7 @@ mod test {
bank.clone(),
leader_info.sockets.broadcast,
cluster_info,
entry_height,
blob_index,
leader_scheduler,
entry_receiver,
max_tick_height,
@ -361,7 +360,7 @@ mod test {
#[ignore]
//TODO this test won't work since broadcast stage no longer edits the ledger
fn test_broadcast_ledger() {
let ledger_path = get_tmp_ledger_path("test_broadcast");
let ledger_path = get_tmp_ledger_path("test_broadcast_ledger");
{
// Create the leader scheduler
let leader_keypair = Keypair::new();
@ -370,8 +369,7 @@ mod test {
// Mock the tick height to look like the tick height right after a leader transition
leader_scheduler.set_leader_schedule(vec![leader_keypair.pubkey()]);
let start_tick_height = 0;
let max_tick_height = start_tick_height + leader_scheduler.ticks_per_epoch;
let entry_height = 2 * start_tick_height;
let max_tick_height = start_tick_height + leader_scheduler.ticks_per_slot;
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let (entry_sender, entry_receiver) = channel();
@ -380,7 +378,7 @@ mod test {
&ledger_path,
leader_scheduler.clone(),
entry_receiver,
entry_height,
0,
max_tick_height,
);
@ -395,13 +393,16 @@ mod test {
sleep(Duration::from_millis(2000));
let db_ledger = broadcast_service.db_ledger;
let mut blob_index = 0;
for i in 0..max_tick_height - start_tick_height {
let slot = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(start_tick_height + i + 1);
let result = db_ledger.get_data_blob(slot, entry_height + i).unwrap();
let result = db_ledger.get_data_blob(slot, blob_index).unwrap();
blob_index += 1;
assert!(result.is_some());
}

View File

@ -145,7 +145,7 @@ enum Protocol {
/// Window protocol messages
/// TODO: move this message to a different module
RequestWindowIndex(NodeInfo, u64),
RequestWindowIndex(NodeInfo, u64, u64),
}
impl ClusterInfo {
@ -692,13 +692,17 @@ impl ClusterInfo {
orders
}
pub fn window_index_request_bytes(&self, ix: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data().clone(), ix);
pub fn window_index_request_bytes(&self, slot_height: u64, blob_index: u64) -> Result<Vec<u8>> {
let req = Protocol::RequestWindowIndex(self.my_data().clone(), slot_height, blob_index);
let out = serialize(&req)?;
Ok(out)
}
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
pub fn window_index_request(
&self,
slot_height: u64,
blob_index: u64,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication, as indicated
// by a valid tvu port location
let valid: Vec<_> = self.repair_peers();
@ -707,11 +711,11 @@ impl ClusterInfo {
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].gossip; // send the request to the peer's gossip port
let out = self.window_index_request_bytes(ix)?;
let out = self.window_index_request_bytes(slot_height, blob_index)?;
submit(
influxdb::Point::new("cluster-info")
.add_field("repair-ix", influxdb::Value::Integer(ix as i64))
.add_field("repair-ix", influxdb::Value::Integer(blob_index as i64))
.to_owned(),
);
@ -835,34 +839,38 @@ impl ClusterInfo {
})
.unwrap()
}
// TODO: To support repairing multiple slots, broadcast needs to reset
// blob index for every slot, and window requests should be by slot + index.
// Issue: https://github.com/solana-labs/solana/issues/2440
fn run_window_request(
from: &NodeInfo,
from_addr: &SocketAddr,
db_ledger: Option<&Arc<DbLedger>>,
me: &NodeInfo,
ix: u64,
slot_height: u64,
blob_index: u64,
) -> Vec<SharedBlob> {
if let Some(db_ledger) = db_ledger {
let meta = db_ledger.meta();
// Try to find the requested index in one of the slots
let blob = db_ledger.get_data_blob(slot_height, blob_index);
if let Ok(Some(meta)) = meta {
let max_slot = meta.received_slot;
// Try to find the requested index in one of the slots
for i in 0..=max_slot {
let blob = db_ledger.get_data_blob(i, ix);
if let Ok(Some(mut blob)) = blob {
inc_new_counter_info!("cluster_info-window-request-ledger", 1);
blob.meta.set_addr(from_addr);
if let Ok(Some(mut blob)) = blob {
inc_new_counter_info!("cluster_info-window-request-ledger", 1);
blob.meta.set_addr(from_addr);
return vec![Arc::new(RwLock::new(blob))];
}
}
return vec![Arc::new(RwLock::new(blob))];
}
}
inc_new_counter_info!("cluster_info-window-request-fail", 1);
trace!("{}: failed RequestWindowIndex {} {}", me.id, from.id, ix,);
trace!(
"{}: failed RequestWindowIndex {} {} {}",
me.id,
from.id,
slot_height,
blob_index,
);
vec![]
}
@ -987,7 +995,8 @@ impl ClusterInfo {
me: &Arc<RwLock<Self>>,
from: &ContactInfo,
db_ledger: Option<&Arc<DbLedger>>,
ix: u64,
slot_height: u64,
blob_index: u64,
from_addr: &SocketAddr,
) -> Vec<SharedBlob> {
let now = Instant::now();
@ -999,8 +1008,8 @@ impl ClusterInfo {
let self_id = me.read().unwrap().gossip.id;
if from.id == me.read().unwrap().gossip.id {
warn!(
"{}: Ignored received RequestWindowIndex from ME {} {} ",
self_id, from.id, ix,
"{}: Ignored received RequestWindowIndex from ME {} {} {} ",
self_id, from.id, slot_height, blob_index,
);
inc_new_counter_info!("cluster_info-window-request-address-eq", 1);
return vec![];
@ -1010,16 +1019,24 @@ impl ClusterInfo {
let my_info = me.read().unwrap().my_data().clone();
inc_new_counter_info!("cluster_info-window-request-recv", 1);
trace!(
"{}: received RequestWindowIndex from: {} index: {} ",
"{}: received RequestWindowIndex from: {} slot_height: {}, blob_index: {}",
self_id,
from.id,
ix,
slot_height,
blob_index,
);
let res = Self::run_window_request(
&from,
&from_addr,
db_ledger,
&my_info,
slot_height,
blob_index,
);
let res = Self::run_window_request(&from, &from_addr, db_ledger, &my_info, ix);
report_time_spent(
"RequestWindowIndex",
&now.elapsed(),
&format!(" ix: {}", ix),
&format!("slot_height {}, blob_index: {}", slot_height, blob_index),
);
res
}
@ -1081,8 +1098,15 @@ impl ClusterInfo {
}
vec![]
}
Protocol::RequestWindowIndex(from, ix) => {
Self::handle_request_window_index(me, &from, db_ledger, ix, from_addr)
Protocol::RequestWindowIndex(from, slot_height, blob_index) => {
Self::handle_request_window_index(
me,
&from,
db_ledger,
slot_height,
blob_index,
from_addr,
)
}
}
}
@ -1330,7 +1354,7 @@ mod tests {
fn window_index_request() {
let me = NodeInfo::new_localhost(Keypair::new().pubkey(), timestamp());
let mut cluster_info = ClusterInfo::new(me);
let rv = cluster_info.window_index_request(0);
let rv = cluster_info.window_index_request(0, 0);
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
let gossip_addr = socketaddr!([127, 0, 0, 1], 1234);
@ -1345,7 +1369,7 @@ mod tests {
0,
);
cluster_info.insert_info(nxt.clone());
let rv = cluster_info.window_index_request(0).unwrap();
let rv = cluster_info.window_index_request(0, 0).unwrap();
assert_eq!(nxt.gossip, gossip_addr);
assert_eq!(rv.0, nxt.gossip);
@ -1365,7 +1389,7 @@ mod tests {
let mut two = false;
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = cluster_info.window_index_request(0).unwrap();
let rv = cluster_info.window_index_request(0, 0).unwrap();
if rv.0 == gossip_addr {
one = true;
}
@ -1393,8 +1417,14 @@ mod tests {
socketaddr!("127.0.0.1:1239"),
0,
);
let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 0);
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
Some(&db_ledger),
&me,
0,
0,
);
assert!(rv.is_empty());
let data_size = 1;
let blob = SharedBlob::default();
@ -1410,8 +1440,14 @@ mod tests {
.write_shared_blobs(vec![&blob])
.expect("Expect successful ledger write");
let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1);
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
Some(&db_ledger),
&me,
2,
1,
);
assert!(!rv.is_empty());
let v = rv[0].clone();
assert_eq!(v.read().unwrap().index(), 1);

File diff suppressed because it is too large Load Diff

View File

@ -21,8 +21,49 @@ use std::sync::{Arc, RwLock};
pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn generate_repairs(db_ledger: &DbLedger, max_repairs: usize) -> Result<Vec<(u64, u64)>> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<(u64, u64)> = vec![];
let mut slots = vec![0];
while repairs.len() < max_repairs && !slots.is_empty() {
let slot_height = slots.pop().unwrap();
let slot = db_ledger.meta(slot_height)?;
if slot.is_none() {
continue;
}
let slot = slot.unwrap();
slots.extend(slot.next_slots.clone());
if slot.contains_all_ticks(db_ledger) {
continue;
} else {
let num_unreceived_ticks = {
if slot.consumed == slot.received {
slot.num_expected_ticks(db_ledger) - slot.consumed_ticks
} else {
0
}
};
let upper = slot.received + num_unreceived_ticks;
let reqs = db_ledger.find_missing_data_indexes(
0,
slot.consumed,
upper,
max_repairs - repairs.len(),
);
repairs.extend(reqs.into_iter().map(|i| (slot_height, i)))
}
}
Ok(repairs)
}
pub fn repair(
db_ledger: &DbLedger,
slot_index: u64,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey,
times: usize,
@ -31,7 +72,8 @@ pub fn repair(
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
let rcluster_info = cluster_info.read().unwrap();
let meta = db_ledger.meta()?;
let is_next_leader = false;
let meta = db_ledger.meta(slot_index)?;
if meta.is_none() {
return Ok(vec![]);
}
@ -44,7 +86,7 @@ pub fn repair(
assert!(received > consumed);
// Check if we are the next next slot leader
let is_next_leader = {
{
let leader_scheduler = leader_scheduler_option.read().unwrap();
let next_slot = leader_scheduler.tick_height_to_slot(tick_height) + 1;
match leader_scheduler.get_leader_for_slot(next_slot) {
@ -88,7 +130,7 @@ pub fn repair(
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| rcluster_info.window_index_request(pix).ok())
.filter_map(|pix| rcluster_info.window_index_request(slot_index, pix).ok())
.collect();
drop(rcluster_info);
@ -210,7 +252,9 @@ pub fn process_blob(
// If write_shared_blobs() of these recovered blobs fails fails, don't return
// because consumed_entries might be nonempty from earlier, and tick height needs to
// be updated. Hopefully we can recover these blobs next time successfully.
if let Err(e) = try_erasure(db_ledger, &mut consumed_entries) {
// TODO: Support per-slot erasure. Issue: https://github.com/solana-labs/solana/issues/2441
if let Err(e) = try_erasure(db_ledger, &mut consumed_entries, 0) {
trace!(
"erasure::recover failed to write recovered coding blobs. Err: {:?}",
e
@ -227,7 +271,7 @@ pub fn process_blob(
// then stop
if max_ix != 0 && !consumed_entries.is_empty() {
let meta = db_ledger
.meta()?
.meta(0)?
.expect("Expect metadata to exist if consumed entries is nonzero");
let consumed = meta.consumed;
@ -267,15 +311,19 @@ pub fn calculate_max_repair_entry_height(
}
#[cfg(feature = "erasure")]
fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Result<()> {
let meta = db_ledger.meta()?;
fn try_erasure(
db_ledger: &Arc<DbLedger>,
consume_queue: &mut Vec<Entry>,
slot_index: u64,
) -> Result<()> {
let meta = db_ledger.meta(slot_index)?;
if let Some(meta) = meta {
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
let (data, coding) = erasure::recover(db_ledger, slot_index, meta.consumed)?;
for c in coding {
let c = c.read().unwrap();
db_ledger.put_coding_blob_bytes(
meta.consumed_slot,
0,
c.index(),
&c.data[..BLOB_HEADER_SIZE + c.size()],
)?;
@ -435,6 +483,79 @@ mod test {
assert!(blob_receiver.try_recv().is_err());
}
#[test]
pub fn test_generate_repairs() {
let db_ledger_path = get_tmp_ledger_path("test_generate_repairs");
let num_ticks_per_slot = 10;
let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot);
let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap();
let num_entries_per_slot = 10;
let num_slots = 2;
let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs();
// Insert every nth entry for each slot
let nth = 3;
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(((i % num_entries_per_slot) * nth) as u64);
b.set_slot((i / num_entries_per_slot) as u64);
}
db_ledger.write_blobs(&blobs).unwrap();
let missing_indexes_per_slot: Vec<u64> = (0..num_entries_per_slot - 1)
.flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64))
.collect();
let expected: Vec<(u64, u64)> = (0..num_slots)
.flat_map(|slot_height| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| (slot_height as u64, *blob_index))
})
.collect();
// Across all slots, find all missing indexes in the range [0, num_entries_per_slot * nth]
assert_eq!(
generate_repairs(&db_ledger, std::usize::MAX).unwrap(),
expected
);
assert_eq!(
generate_repairs(&db_ledger, expected.len() - 2).unwrap()[..],
expected[0..expected.len() - 2]
);
// Now fill in all the holes for each slot such that for each slot, consumed == received.
// Because none of the slots contain ticks, we should see that the repair requests
// ask for ticks, starting from the last received index for that slot
for (slot_height, blob_index) in expected {
let mut b = make_tiny_test_entries(1).to_blobs().pop().unwrap();
b.set_index(blob_index);
b.set_slot(slot_height);
db_ledger.write_blobs(&vec![b]).unwrap();
}
let last_index_per_slot = ((num_entries_per_slot - 1) * nth) as u64;
let missing_indexes_per_slot: Vec<u64> =
(last_index_per_slot + 1..last_index_per_slot + 1 + num_ticks_per_slot).collect();
let expected: Vec<(u64, u64)> = (0..num_slots)
.flat_map(|slot_height| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| (slot_height as u64, *blob_index))
})
.collect();
assert_eq!(
generate_repairs(&db_ledger, std::usize::MAX).unwrap(),
expected
);
assert_eq!(
generate_repairs(&db_ledger, expected.len() - 2).unwrap()[..],
expected[0..expected.len() - 2]
);
}
#[test]
pub fn test_find_missing_data_indexes_sanity() {
let slot = DEFAULT_SLOT_HEIGHT;
@ -564,6 +685,74 @@ mod test {
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_find_missing_data_indexes_slots() {
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_slots");
let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
let num_entries_per_slot = 10;
let num_slots = 2;
let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs();
// Insert every nth entry for each slot
let nth = 3;
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(((i % num_entries_per_slot) * nth) as u64);
b.set_slot((i / num_entries_per_slot) as u64);
}
db_ledger.write_blobs(&blobs).unwrap();
let mut expected: Vec<u64> = (0..num_entries_per_slot)
.flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64))
.collect();
// For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth]
for slot_height in 0..num_slots {
assert_eq!(
db_ledger.find_missing_data_indexes(
slot_height as u64,
0,
(num_entries_per_slot * nth) as u64,
num_entries_per_slot * nth as usize
),
expected,
);
}
// Test with a limit on the number of returned entries
for slot_height in 0..num_slots {
assert_eq!(
db_ledger.find_missing_data_indexes(
slot_height as u64,
0,
(num_entries_per_slot * nth) as u64,
num_entries_per_slot * (nth - 1)
)[..],
expected[..num_entries_per_slot * (nth - 1)],
);
}
// Try to find entries in the range [num_entries_per_slot * nth..num_entries_per_slot * (nth + 1)
// that don't exist in the ledger.
let extra_entries =
(num_entries_per_slot * nth) as u64..(num_entries_per_slot * (nth + 1)) as u64;
expected.extend(extra_entries);
// For each slot, find all missing indexes in the range [0, num_entries_per_slot * nth]
for slot_height in 0..num_slots {
assert_eq!(
db_ledger.find_missing_data_indexes(
slot_height as u64,
0,
(num_entries_per_slot * (nth + 1)) as u64,
num_entries_per_slot * (nth + 1),
),
expected,
);
}
}
#[test]
pub fn test_no_missing_blob_indexes() {
let slot = DEFAULT_SLOT_HEIGHT;
@ -577,13 +766,13 @@ mod test {
index_blobs(
&shared_blobs,
&Keypair::new().pubkey(),
0,
&mut 0,
&vec![slot; num_entries],
);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
db_ledger.write_blobs(&blobs).unwrap();
db_ledger.write_blobs(blobs).unwrap();
let empty: Vec<u64> = vec![];
for i in 0..num_entries as u64 {
@ -625,7 +814,8 @@ mod test {
let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, false));
let mut consume_queue = vec![];
try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt");
try_erasure(&db_ledger, &mut consume_queue, DEFAULT_SLOT_HEIGHT)
.expect("Expected successful erasure attempt");
window[erased_index].data = erased_data;
{
@ -668,7 +858,7 @@ mod test {
index_blobs(
&shared_blobs,
&Keypair::new().pubkey(),
0,
&mut 0,
&vec![DEFAULT_SLOT_HEIGHT; num_entries],
);

View File

@ -893,7 +893,7 @@ pub mod test {
index_blobs(
&blobs,
&Keypair::new().pubkey(),
offset as u64,
&mut (offset as u64),
&vec![slot; blobs.len()],
);
@ -911,7 +911,7 @@ pub mod test {
index_blobs(
&blobs,
&Keypair::new().pubkey(),
offset as u64,
&mut (offset as u64),
&vec![DEFAULT_SLOT_HEIGHT; blobs.len()],
);
blobs

View File

@ -3,7 +3,7 @@
use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, Node, NodeInfo};
use crate::counter::Counter;
use crate::db_ledger::DbLedger;
use crate::db_ledger::{DbLedger, DbLedgerConfig};
use crate::genesis_block::GenesisBlock;
use crate::gossip_service::GossipService;
use crate::leader_scheduler::LeaderSchedulerConfig;
@ -64,6 +64,7 @@ pub struct FullnodeConfig {
pub entry_stream: Option<String>,
pub storage_rotate_count: u64,
pub leader_scheduler_config: LeaderSchedulerConfig,
pub ledger_config: DbLedgerConfig,
}
impl Default for FullnodeConfig {
fn default() -> Self {
@ -77,10 +78,18 @@ impl Default for FullnodeConfig {
entry_stream: None,
storage_rotate_count: NUM_HASHES_FOR_STORAGE_ROTATE,
leader_scheduler_config: Default::default(),
ledger_config: Default::default(),
}
}
}
impl FullnodeConfig {
pub fn set_leader_scheduler_config(&mut self, config: LeaderSchedulerConfig) {
self.ledger_config.ticks_per_slot = config.ticks_per_slot;
self.leader_scheduler_config = config;
}
}
pub struct Fullnode {
id: Pubkey,
exit: Arc<AtomicBool>,
@ -107,6 +116,8 @@ impl Fullnode {
entrypoint_info_option: Option<&NodeInfo>,
config: &FullnodeConfig,
) -> Self {
info!("creating bank...");
let id = keypair.pubkey();
assert_eq!(id, node.info.id);
@ -117,7 +128,11 @@ impl Fullnode {
db_ledger,
ledger_signal_sender,
ledger_signal_receiver,
) = new_bank_from_ledger(ledger_path, &config.leader_scheduler_config);
) = new_bank_from_ledger(
ledger_path,
config.ledger_config,
&config.leader_scheduler_config,
);
info!("node info: {:?}", node.info);
info!("node entrypoint_info: {:?}", entrypoint_info_option);
@ -184,7 +199,7 @@ impl Fullnode {
}
// Get the scheduled leader
let (scheduled_leader, max_tpu_tick_height) = {
let (scheduled_leader, slot_height, max_tpu_tick_height) = {
let tick_height = bank.tick_height();
let leader_scheduler = bank.leader_scheduler.read().unwrap();
@ -193,6 +208,7 @@ impl Fullnode {
leader_scheduler
.get_leader_for_slot(slot)
.expect("Leader not known after processing bank"),
slot,
tick_height + leader_scheduler.num_ticks_left_in_slot(tick_height),
)
};
@ -235,9 +251,12 @@ impl Fullnode {
let (to_leader_sender, to_leader_receiver) = channel();
let (to_validator_sender, to_validator_receiver) = channel();
let blob_index = Self::get_consumed_for_slot(&db_ledger, slot_height);
let (tvu, blob_sender) = Tvu::new(
voting_keypair_option,
&bank,
blob_index,
entry_height,
last_entry_id,
&cluster_info,
@ -263,7 +282,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone broadcast socket"),
cluster_info.clone(),
entry_height,
blob_index,
config.sigverify_disabled,
max_tpu_tick_height,
&last_entry_id,
@ -318,8 +337,8 @@ impl Fullnode {
if scheduled_leader == self.id {
debug!("node is still the leader");
let (last_entry_id, entry_height) = self.node_services.tvu.get_state();
self.validator_to_leader(tick_height, entry_height, last_entry_id);
let last_entry_id = self.node_services.tvu.get_state();
self.validator_to_leader(tick_height, last_entry_id);
FullnodeReturnType::LeaderToLeaderRotation
} else {
debug!("new leader is {}", scheduled_leader);
@ -334,12 +353,11 @@ impl Fullnode {
}
}
fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) {
pub fn validator_to_leader(&mut self, tick_height: u64, last_entry_id: Hash) {
trace!(
"validator_to_leader({:?}): tick_height={} entry_height={} last_entry_id={}",
"validator_to_leader({:?}): tick_height={} last_entry_id={}",
self.id,
tick_height,
entry_height,
last_entry_id,
);
@ -382,7 +400,7 @@ impl Fullnode {
self.cluster_info.clone(),
self.sigverify_disabled,
max_tick_height,
entry_height,
0,
&last_entry_id,
self.id,
&to_validator_sender,
@ -409,8 +427,8 @@ impl Fullnode {
} else {
let should_be_leader = self.to_leader_receiver.recv_timeout(timeout);
match should_be_leader {
Ok(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => {
self.validator_to_leader(tick_height, entry_height, last_entry_id);
Ok(TvuReturnType::LeaderRotation(tick_height, last_entry_id)) => {
self.validator_to_leader(tick_height, last_entry_id);
return Some((
FullnodeReturnType::ValidatorToLeaderRotation,
tick_height + 1,
@ -471,14 +489,24 @@ impl Fullnode {
self.exit();
self.join()
}
fn get_consumed_for_slot(db_ledger: &DbLedger, slot_index: u64) -> u64 {
let meta = db_ledger.meta(slot_index).expect("Database error");
if let Some(meta) = meta {
meta.consumed
} else {
0
}
}
}
pub fn new_bank_from_ledger(
ledger_path: &str,
ledger_config: DbLedgerConfig,
leader_scheduler_config: &LeaderSchedulerConfig,
) -> (Bank, u64, Hash, DbLedger, SyncSender<bool>, Receiver<bool>) {
let (db_ledger, ledger_signal_sender, ledger_signal_receiver) =
DbLedger::open_with_signal(ledger_path)
DbLedger::open_with_config_signal(ledger_path, ledger_config)
.expect("Expected to successfully open database ledger");
let genesis_block =
GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block");
@ -525,7 +553,7 @@ impl Service for Fullnode {
#[cfg(test)]
mod tests {
use super::*;
use crate::db_ledger::{create_tmp_sample_ledger, tmp_copy_ledger, DEFAULT_SLOT_HEIGHT};
use crate::db_ledger::{create_tmp_sample_ledger, tmp_copy_ledger};
use crate::entry::make_consecutive_blobs;
use crate::leader_scheduler::make_active_set_entries;
use crate::streamer::responder;
@ -626,7 +654,7 @@ mod tests {
let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair);
// Start the bootstrap leader
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config = leader_scheduler_config;
fullnode_config.set_leader_scheduler_config(leader_scheduler_config);
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_keypair,
@ -655,11 +683,11 @@ mod tests {
let mut fullnode_config = FullnodeConfig::default();
let ticks_per_slot = 16;
let slots_per_epoch = 2;
fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new(
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
slots_per_epoch,
ticks_per_slot * slots_per_epoch,
);
));
// Create the leader and validator nodes
let bootstrap_leader_keypair = Arc::new(Keypair::new());
@ -673,6 +701,7 @@ mod tests {
// tick_height = 0 from the leader scheduler's active window
ticks_per_slot * 4,
"test_wrong_role_transition",
ticks_per_slot,
);
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
@ -726,6 +755,8 @@ mod tests {
fn test_validator_to_leader_transition() {
solana_logger::setup();
// Make leader and validator node
let ticks_per_slot = 10;
let slots_per_epoch = 4;
let leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new());
let (leader_node, validator_node, validator_ledger_path, ledger_initial_len, last_id) =
@ -735,6 +766,7 @@ mod tests {
0,
0,
"test_validator_to_leader_transition",
ticks_per_slot,
);
let leader_id = leader_keypair.pubkey();
@ -744,17 +776,15 @@ mod tests {
info!("validator: {:?}", validator_info.id);
// Set the leader scheduler for the validator
let ticks_per_slot = 10;
let slots_per_epoch = 4;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new(
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
slots_per_epoch,
ticks_per_slot * slots_per_epoch,
);
));
let voting_keypair = VotingKeypair::new_local(&validator_keypair);
// Start the validator
let validator = Fullnode::new(
validator_node,
@ -805,8 +835,11 @@ mod tests {
// Close the validator so that rocksdb has locks available
validator_exit();
let (bank, entry_height, _, _, _, _) =
new_bank_from_ledger(&validator_ledger_path, &LeaderSchedulerConfig::default());
let (bank, entry_height, _, _, _, _) = new_bank_from_ledger(
&validator_ledger_path,
DbLedgerConfig::default(),
&LeaderSchedulerConfig::default(),
);
assert!(bank.tick_height() >= bank.leader_scheduler.read().unwrap().ticks_per_epoch);
@ -825,27 +858,32 @@ mod tests {
solana_logger::setup();
// Make leader node
let ticks_per_slot = 5;
let slots_per_epoch = 2;
let leader_keypair = Arc::new(Keypair::new());
let validator_keypair = Arc::new(Keypair::new());
info!("leader: {:?}", leader_keypair.pubkey());
info!("validator: {:?}", validator_keypair.pubkey());
let (leader_node, _, leader_ledger_path, _, _) =
setup_leader_validator(&leader_keypair, &validator_keypair, 1, 0, "test_tvu_behind");
let (leader_node, _, leader_ledger_path, _, _) = setup_leader_validator(
&leader_keypair,
&validator_keypair,
1,
0,
"test_tvu_behind",
ticks_per_slot,
);
let leader_node_info = leader_node.info.clone();
// Set the leader scheduler for the validator
let ticks_per_slot = 5;
let slots_per_epoch = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new(
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
slots_per_epoch,
ticks_per_slot * slots_per_epoch,
);
));
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
info!("Start the bootstrap leader");
@ -914,11 +952,13 @@ mod tests {
num_genesis_ticks: u64,
num_ending_ticks: u64,
test_name: &str,
ticks_per_block: u64,
) -> (Node, Node, String, u64, Hash) {
// Make a leader identity
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
// Create validator identity
assert!(num_genesis_ticks <= ticks_per_block);
let (mint_keypair, ledger_path, genesis_entry_height, last_id) = create_tmp_sample_ledger(
test_name,
10_000,
@ -946,14 +986,33 @@ mod tests {
num_ending_ticks,
);
let non_tick_active_entries_len = active_set_entries.len() - num_ending_ticks as usize;
let remaining_ticks_in_zeroth_slot = ticks_per_block - num_genesis_ticks;
let entries_for_zeroth_slot =
non_tick_active_entries_len + remaining_ticks_in_zeroth_slot as usize;
let entry_chunks: Vec<_> = active_set_entries[entries_for_zeroth_slot..]
.chunks(ticks_per_block as usize)
.collect();
let db_ledger = DbLedger::open(&ledger_path).unwrap();
db_ledger
.write_entries(
DEFAULT_SLOT_HEIGHT,
genesis_entry_height,
&active_set_entries,
)
.unwrap();
// Iterate writing slots through 0..entry_chunks.len()
for i in 0..entry_chunks.len() + 1 {
let (start_height, entries) = {
if i == 0 {
(
genesis_entry_height,
&active_set_entries[..entries_for_zeroth_slot],
)
} else {
(0, entry_chunks[i - 1])
}
};
db_ledger
.write_entries(i as u64, start_height, entries)
.unwrap();
}
let entry_height = genesis_entry_height + active_set_entries.len() as u64;
(

View File

@ -50,6 +50,7 @@ pub mod poh;
pub mod poh_recorder;
pub mod poh_service;
pub mod recvmmsg;
pub mod repair_service;
pub mod replay_stage;
pub mod replicator;
pub mod result;

View File

@ -442,16 +442,15 @@ impl Blob {
}
}
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) {
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slots: &[u64]) {
// enumerate all the blobs, those are the indices
for (blob, slot) in blobs.iter().zip(slots) {
let mut blob = blob.write().unwrap();
blob.set_index(index);
blob.set_index(*blob_index);
blob.set_slot(*slot);
blob.set_id(id);
index += 1;
*blob_index += 1;
}
}

372
src/repair_service.rs Normal file
View File

@ -0,0 +1,372 @@
//! The `repair_service` module implements the tools necessary to generate a thread which
//! regularly finds missing blobs in the ledger and sends repair requests for those blobs
use crate::cluster_info::ClusterInfo;
use crate::db_ledger::{DbLedger, SlotMeta};
use crate::result::Result;
use crate::service::Service;
use solana_metrics::{influxdb, submit};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
pub const MAX_REPAIR_LENGTH: usize = 16;
pub const REPAIR_MS: u64 = 100;
pub const MAX_REPAIR_TRIES: u64 = 128;
#[derive(Default)]
struct RepairInfo {
max_slot: u64,
repair_tries: u64,
}
impl RepairInfo {
fn new() -> Self {
RepairInfo {
max_slot: 0,
repair_tries: 0,
}
}
}
pub struct RepairService {
t_repair: JoinHandle<()>,
}
impl RepairService {
fn run(
db_ledger: &Arc<DbLedger>,
exit: &Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
let mut repair_info = RepairInfo::new();
let id = cluster_info.read().unwrap().id();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let repairs = Self::generate_repairs(db_ledger, MAX_REPAIR_LENGTH, &mut repair_info);
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|(slot_height, blob_index)| {
cluster_info
.read()
.unwrap()
.window_index_request(slot_height, blob_index)
.map(|result| (result, slot_height, blob_index))
.ok()
})
.collect();
for ((to, req), slot_height, blob_index) in reqs {
if let Ok(local_addr) = repair_socket.local_addr() {
submit(
influxdb::Point::new("repair_service")
.add_field(
"repair_slot",
influxdb::Value::Integer(slot_height as i64),
)
.to_owned()
.add_field(
"repair_blob",
influxdb::Value::Integer(blob_index as i64),
)
.to_owned()
.add_field("to", influxdb::Value::String(to.to_string()))
.to_owned()
.add_field("from", influxdb::Value::String(local_addr.to_string()))
.to_owned()
.add_field("id", influxdb::Value::String(id.to_string()))
.to_owned(),
);
}
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
}
sleep(Duration::from_millis(REPAIR_MS));
}
}
pub fn new(
db_ledger: Arc<DbLedger>,
exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
) -> Self {
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || Self::run(&db_ledger, &exit, &repair_socket, &cluster_info))
.unwrap();
RepairService { t_repair }
}
fn process_slot(
db_ledger: &DbLedger,
slot_height: u64,
slot: &SlotMeta,
max_repairs: usize,
) -> Result<Vec<(u64, u64)>> {
if slot.contains_all_ticks(db_ledger) {
Ok(vec![])
} else {
let num_unreceived_ticks = {
if slot.consumed == slot.received {
let num_expected_ticks = slot.num_expected_ticks(db_ledger);
if num_expected_ticks == 0 {
// This signals that we have received nothing for this slot, try to get at least the
// first entry
1
}
// This signals that we will never use other slots (leader rotation is
// off)
else if num_expected_ticks == std::u64::MAX
|| num_expected_ticks <= slot.consumed_ticks
{
0
} else {
num_expected_ticks - slot.consumed_ticks
}
} else {
0
}
};
let upper = slot.received + num_unreceived_ticks;
let reqs =
db_ledger.find_missing_data_indexes(slot_height, slot.consumed, upper, max_repairs);
Ok(reqs.into_iter().map(|i| (slot_height, i)).collect())
}
}
fn generate_repairs(
db_ledger: &DbLedger,
max_repairs: usize,
repair_info: &mut RepairInfo,
) -> Result<Vec<(u64, u64)>> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<(u64, u64)> = vec![];
let mut current_slot_height = Some(0);
while repairs.len() < max_repairs && current_slot_height.is_some() {
if current_slot_height.unwrap() > repair_info.max_slot {
repair_info.repair_tries = 0;
repair_info.max_slot = current_slot_height.unwrap();
}
let slot = db_ledger.meta(current_slot_height.unwrap())?;
if slot.is_none() {
current_slot_height = db_ledger.get_next_slot(current_slot_height.unwrap())?;
continue;
}
let slot = slot.unwrap();
let new_repairs = Self::process_slot(
db_ledger,
current_slot_height.unwrap(),
&slot,
max_repairs - repairs.len(),
)?;
repairs.extend(new_repairs);
current_slot_height = db_ledger.get_next_slot(current_slot_height.unwrap())?;
}
// Only increment repair_tries if the ledger contains every blob for every slot
if repairs.is_empty() {
repair_info.repair_tries += 1;
}
// Optimistically try the next slot if we haven't gotten any repairs
// for a while
if repair_info.repair_tries >= MAX_REPAIR_TRIES {
repairs.push((repair_info.max_slot + 1, 0))
}
Ok(repairs)
}
}
impl Service for RepairService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_repair.join()
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::db_ledger::{get_tmp_ledger_path, DbLedger, DbLedgerConfig};
use crate::entry::create_ticks;
use crate::entry::{make_tiny_test_entries, EntrySlice};
use solana_sdk::hash::Hash;
#[test]
pub fn test_repair_missed_future_slot() {
let db_ledger_path = get_tmp_ledger_path("test_repair_missed_future_slot");
{
let num_ticks_per_slot = 1;
let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot);
let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap();
let mut blobs = create_ticks(1, Hash::default()).to_blobs();
blobs[0].set_index(0);
blobs[0].set_slot(0);
db_ledger.write_blobs(&blobs).unwrap();
let mut repair_info = RepairInfo::new();
// We have all the blobs for all the slots in the ledger, wait for optimistic
// future repair after MAX_REPAIR_TRIES
for i in 0..MAX_REPAIR_TRIES {
// Check that repair tries to patch the empty slot
assert_eq!(repair_info.repair_tries, i);
assert_eq!(repair_info.max_slot, 0);
let expected = if i == MAX_REPAIR_TRIES - 1 {
vec![(1, 0)]
} else {
vec![]
};
assert_eq!(
RepairService::generate_repairs(&db_ledger, 2, &mut repair_info).unwrap(),
expected
);
}
// Insert a bigger blob
let mut blobs = create_ticks(1, Hash::default()).to_blobs();
blobs[0].set_index(0);
blobs[0].set_slot(1);
db_ledger.write_blobs(&blobs).unwrap();
assert_eq!(
RepairService::generate_repairs(&db_ledger, 2, &mut repair_info).unwrap(),
vec![]
);
assert_eq!(repair_info.repair_tries, 1);
assert_eq!(repair_info.max_slot, 1);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_repair_empty_slot() {
let db_ledger_path = get_tmp_ledger_path("test_repair_empty_slot");
{
let num_ticks_per_slot = 10;
let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot);
let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap();
let mut blobs = make_tiny_test_entries(1).to_blobs();
blobs[0].set_index(1);
blobs[0].set_slot(2);
let mut repair_info = RepairInfo::new();
// Write this blob to slot 2, should chain to slot 1, which we haven't received
// any blobs for
db_ledger.write_blobs(&blobs).unwrap();
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(&db_ledger, 2, &mut repair_info).unwrap(),
vec![(1, 0), (2, 0)]
);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_generate_repairs() {
let db_ledger_path = get_tmp_ledger_path("test_generate_repairs");
{
let num_ticks_per_slot = 10;
let db_ledger_config = DbLedgerConfig::new(num_ticks_per_slot);
let db_ledger = DbLedger::open_config(&db_ledger_path, db_ledger_config).unwrap();
let num_entries_per_slot = 10;
let num_slots = 2;
let mut blobs = make_tiny_test_entries(num_slots * num_entries_per_slot).to_blobs();
let mut repair_info = RepairInfo::new();
// Insert every nth entry for each slot
let nth = 3;
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(((i % num_entries_per_slot) * nth) as u64);
b.set_slot((i / num_entries_per_slot) as u64);
}
db_ledger.write_blobs(&blobs).unwrap();
let missing_indexes_per_slot: Vec<u64> = (0..num_entries_per_slot - 1)
.flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64))
.collect();
let expected: Vec<(u64, u64)> = (0..num_slots)
.flat_map(|slot_height| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| (slot_height as u64, *blob_index))
})
.collect();
// Across all slots, find all missing indexes in the range [0, num_entries_per_slot * nth]
assert_eq!(
RepairService::generate_repairs(&db_ledger, std::usize::MAX, &mut repair_info)
.unwrap(),
expected
);
assert_eq!(
RepairService::generate_repairs(&db_ledger, expected.len() - 2, &mut repair_info)
.unwrap()[..],
expected[0..expected.len() - 2]
);
// Now fill in all the holes for each slot such that for each slot, consumed == received.
// Because none of the slots contain ticks, we should see that the repair requests
// ask for ticks, starting from the last received index for that slot
for (slot_height, blob_index) in expected {
let mut b = make_tiny_test_entries(1).to_blobs().pop().unwrap();
b.set_index(blob_index);
b.set_slot(slot_height);
db_ledger.write_blobs(&vec![b]).unwrap();
}
let last_index_per_slot = ((num_entries_per_slot - 1) * nth) as u64;
let missing_indexes_per_slot: Vec<u64> =
(last_index_per_slot + 1..last_index_per_slot + 1 + num_ticks_per_slot).collect();
let expected: Vec<(u64, u64)> = (0..num_slots)
.flat_map(|slot_height| {
missing_indexes_per_slot
.iter()
.map(move |blob_index| (slot_height as u64, *blob_index))
})
.collect();
assert_eq!(
RepairService::generate_repairs(&db_ledger, std::usize::MAX, &mut repair_info)
.unwrap(),
expected
);
assert_eq!(
RepairService::generate_repairs(&db_ledger, expected.len() - 2, &mut repair_info)
.unwrap()[..],
expected[0..expected.len() - 2]
);
}
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
}

View File

@ -62,7 +62,7 @@ impl ReplayStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
voting_keypair: Option<&Arc<VotingKeypair>>,
ledger_entry_sender: &EntrySender,
entry_height: &Arc<RwLock<u64>>,
current_blob_index: &mut u64,
last_entry_id: &Arc<RwLock<Hash>>,
entry_stream: Option<&mut EntryStream>,
) -> Result<()> {
@ -162,8 +162,7 @@ impl ReplayStage {
ledger_entry_sender.send(entries)?;
}
*entry_height.write().unwrap() += entries_len;
*current_blob_index += entries_len;
res?;
inc_new_counter_info!(
"replicate_stage-duration",
@ -180,7 +179,7 @@ impl ReplayStage {
bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>,
exit: Arc<AtomicBool>,
entry_height: Arc<RwLock<u64>>,
mut current_blob_index: u64,
last_entry_id: Arc<RwLock<Hash>>,
to_leader_sender: TvuRotationSender,
entry_stream: Option<&String>,
@ -196,14 +195,14 @@ impl ReplayStage {
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut last_leader_id = Self::get_leader_for_next_tick(&bank);
let mut prev_slot = None;
let (mut current_slot, mut max_tick_height_for_slot) = {
let tick_height = bank.tick_height();
let leader_scheduler = bank.leader_scheduler.read().unwrap();
let current_slot = leader_scheduler.tick_height_to_slot(tick_height + 1);
let first_tick_in_current_slot = current_slot * leader_scheduler.ticks_per_slot;
(
current_slot,
Some(current_slot),
first_tick_in_current_slot
+ leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot),
)
@ -217,15 +216,35 @@ impl ReplayStage {
break;
}
let current_entry_height = *entry_height.read().unwrap();
if current_slot.is_none() {
let new_slot = Self::get_next_slot(
&db_ledger,
prev_slot.expect("prev_slot must exist"),
);
if new_slot.is_some() {
// Reset the state
current_slot = new_slot;
current_blob_index = 0;
let leader_scheduler = bank.leader_scheduler.read().unwrap();
let first_tick_in_current_slot =
current_slot.unwrap() * leader_scheduler.ticks_per_slot;
max_tick_height_for_slot = first_tick_in_current_slot
+ leader_scheduler
.num_ticks_left_in_slot(first_tick_in_current_slot);
}
}
let entries = {
if let Ok(entries) = db_ledger.get_slot_entries(
current_slot,
current_entry_height,
Some(MAX_ENTRY_RECV_PER_ITER as u64),
) {
entries
if let Some(slot) = current_slot {
if let Ok(entries) = db_ledger.get_slot_entries(
slot,
current_blob_index,
Some(MAX_ENTRY_RECV_PER_ITER as u64),
) {
entries
} else {
vec![]
}
} else {
vec![]
}
@ -240,7 +259,7 @@ impl ReplayStage {
&cluster_info,
voting_keypair.as_ref(),
&ledger_entry_sender,
&entry_height,
&mut current_blob_index,
&last_entry_id,
entry_stream.as_mut(),
) {
@ -262,16 +281,16 @@ impl ReplayStage {
to_leader_sender
.send(TvuReturnType::LeaderRotation(
current_tick_height,
*entry_height.read().unwrap(),
*last_entry_id.read().unwrap(),
))
.unwrap();
}
current_slot += 1;
max_tick_height_for_slot +=
bank.leader_scheduler.read().unwrap().ticks_per_slot;
// Check for any slots that chain to this one
prev_slot = current_slot;
current_slot = None;
last_leader_id = leader_id;
continue;
}
}
@ -313,6 +332,12 @@ impl ReplayStage {
.get_leader_for_slot(slot)
.expect("Scheduled leader should be calculated by this point")
}
fn get_next_slot(db_ledger: &DbLedger, slot_index: u64) -> Option<u64> {
// Find the next slot that chains to the old slot
let next_slots = db_ledger.get_slots_since(&[slot_index]).expect("Db error");
next_slots.first().cloned()
}
}
impl Service for ReplayStage {
@ -328,8 +353,9 @@ mod test {
use super::*;
use crate::bank::Bank;
use crate::cluster_info::{ClusterInfo, Node};
use crate::db_ledger::create_tmp_sample_ledger;
use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
use crate::db_ledger::{
create_tmp_sample_ledger, DbLedger, DbLedgerConfig, DEFAULT_SLOT_HEIGHT,
};
use crate::entry::create_ticks;
use crate::entry::Entry;
use crate::fullnode::new_bank_from_ledger;
@ -389,6 +415,7 @@ mod test {
0,
);
last_id = active_set_entries.last().unwrap().id;
{
let db_ledger = DbLedger::open(&my_ledger_path).unwrap();
db_ledger
@ -402,12 +429,13 @@ mod test {
{
// Set up the bank
let db_ledger_config = DbLedgerConfig::new(ticks_per_slot);
let (bank, _entry_height, last_entry_id, db_ledger, l_sender, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &leader_scheduler_config);
new_bank_from_ledger(&my_ledger_path, db_ledger_config, &leader_scheduler_config);
// Set up the replay stage
let (rotation_sender, rotation_receiver) = channel();
let meta = db_ledger.meta().unwrap().unwrap();
let meta = db_ledger.meta(0).unwrap().unwrap();
let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank);
let db_ledger = Arc::new(db_ledger);
@ -418,7 +446,7 @@ mod test {
bank.clone(),
Arc::new(RwLock::new(cluster_info_me)),
exit.clone(),
Arc::new(RwLock::new(meta.consumed)),
meta.consumed,
Arc::new(RwLock::new(last_entry_id)),
rotation_sender,
None,
@ -434,7 +462,6 @@ mod test {
entries_to_send.push(entry);
}
let expected_entry_height = (active_set_entries.len() + total_entries_to_send) as u64;
let expected_last_id = entries_to_send.last().unwrap().id;
// Write the entries to the ledger, replay_stage should get notified of changes
@ -446,7 +473,6 @@ mod test {
assert_eq!(
Some(TvuReturnType::LeaderRotation(
2 * ticks_per_slot - 1,
expected_entry_height,
expected_last_id,
)),
{
@ -507,7 +533,11 @@ mod test {
let (to_leader_sender, _) = channel();
{
let (bank, entry_height, last_entry_id, db_ledger, l_sender, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &LeaderSchedulerConfig::default());
new_bank_from_ledger(
&my_ledger_path,
DbLedgerConfig::default(),
&LeaderSchedulerConfig::default(),
);
let bank = Arc::new(bank);
let db_ledger = Arc::new(db_ledger);
@ -518,7 +548,7 @@ mod test {
bank.clone(),
cluster_info_me.clone(),
exit.clone(),
Arc::new(RwLock::new(entry_height)),
entry_height,
Arc::new(RwLock::new(last_entry_id)),
to_leader_sender,
None,
@ -581,8 +611,6 @@ mod test {
make_active_set_entries(&my_keypair, &mint_keypair, 100, 1, &last_id, &last_id, 0);
let mut last_id = active_set_entries.last().unwrap().id;
let initial_tick_height = genesis_entry_height;
let active_set_entries_len = active_set_entries.len() as u64;
let initial_non_tick_height = genesis_entry_height - initial_tick_height;
{
let db_ledger = DbLedger::open(&my_ledger_path).unwrap();
@ -608,11 +636,12 @@ mod test {
let (rotation_tx, rotation_rx) = channel();
let exit = Arc::new(AtomicBool::new(false));
{
let db_ledger_config = DbLedgerConfig::new(ticks_per_slot);
let (bank, _entry_height, last_entry_id, db_ledger, l_sender, l_receiver) =
new_bank_from_ledger(&my_ledger_path, &leader_scheduler_config);
new_bank_from_ledger(&my_ledger_path, db_ledger_config, &leader_scheduler_config);
let meta = db_ledger
.meta()
.meta(0)
.unwrap()
.expect("First slot metadata must exist");
@ -626,7 +655,7 @@ mod test {
bank.clone(),
cluster_info_me.clone(),
exit.clone(),
Arc::new(RwLock::new(meta.consumed)),
meta.consumed,
Arc::new(RwLock::new(last_entry_id)),
rotation_tx,
None,
@ -642,10 +671,6 @@ mod test {
let total_entries_to_send = (active_window_tick_length - initial_tick_height) as usize;
let num_hashes = 1;
// Add on the only entries that weren't ticks to the bootstrap height to get the
// total expected entry length
let expected_entry_height =
active_window_tick_length + initial_non_tick_height + active_set_entries_len;
let leader_rotation_index =
(active_window_tick_length - initial_tick_height - 1) as usize;
let mut expected_last_id = Hash::default();
@ -674,7 +699,6 @@ mod test {
assert_eq!(
Some(TvuReturnType::LeaderRotation(
active_window_tick_length,
expected_entry_height,
expected_last_id,
)),
{
@ -704,8 +728,7 @@ mod test {
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
let entry_height = 0;
let mut current_blob_index = 0;
let mut last_id = Hash::default();
let mut entries = Vec::new();
for _ in 0..5 {
@ -722,7 +745,7 @@ mod test {
&cluster_info_me,
Some(&voting_keypair),
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
None,
);
@ -744,7 +767,7 @@ mod test {
&cluster_info_me,
Some(&voting_keypair),
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
None,
);
@ -774,7 +797,7 @@ mod test {
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
let last_entry_id = Hash::default();
let entry_height = 0;
let mut entry_height = 0;
let mut last_id = Hash::default();
let mut entries = Vec::new();
let mut expected_entries = Vec::new();
@ -794,7 +817,7 @@ mod test {
&cluster_info_me,
Some(&voting_keypair),
&ledger_entry_sender,
&Arc::new(RwLock::new(entry_height)),
&mut entry_height,
&Arc::new(RwLock::new(last_entry_id)),
Some(&mut entry_stream),
)

View File

@ -12,7 +12,7 @@ use crate::service::Service;
use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
use crate::streamer::BlobReceiver;
use crate::thin_client::{retry_get_balance, ThinClient};
use crate::window_service::window_service;
use crate::window_service::WindowService;
use rand::thread_rng;
use rand::Rng;
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
@ -33,13 +33,12 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
pub struct Replicator {
gossip_service: GossipService,
fetch_stage: BlobFetchStage,
t_window: JoinHandle<()>,
window_service: WindowService,
pub retransmit_receiver: BlobReceiver,
exit: Arc<AtomicBool>,
entry_height: u64,
@ -173,11 +172,10 @@ impl Replicator {
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
let t_window = window_service(
let window_service = WindowService::new(
db_ledger.clone(),
cluster_info.clone(),
0,
entry_height,
max_entry_height,
blob_fetch_receiver,
retransmit_sender,
@ -274,7 +272,7 @@ impl Replicator {
Ok(Self {
gossip_service,
fetch_stage,
t_window,
window_service,
retransmit_receiver,
exit,
entry_height,
@ -289,7 +287,7 @@ impl Replicator {
pub fn join(self) {
self.gossip_service.join().unwrap();
self.fetch_stage.join().unwrap();
self.t_window.join().unwrap();
self.window_service.join().unwrap();
// Drain the queue here to prevent self.retransmit_receiver from being dropped
// before the window_service thread is joined

View File

@ -8,7 +8,7 @@ use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::BlobReceiver;
use crate::window_service::window_service;
use crate::window_service::WindowService;
use log::Level;
use solana_metrics::{influxdb, submit};
use std::net::UdpSocket;
@ -119,16 +119,16 @@ fn retransmitter(
pub struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
window_service: WindowService,
}
impl RetransmitStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank: &Arc<Bank>,
db_ledger: Arc<DbLedger>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
tick_height: u64,
entry_height: u64,
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
@ -144,11 +144,10 @@ impl RetransmitStage {
retransmit_receiver,
);
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
let window_service = WindowService::new(
db_ledger,
cluster_info.clone(),
tick_height,
entry_height,
0,
fetch_stage_receiver,
retransmit_sender,
@ -158,8 +157,11 @@ impl RetransmitStage {
exit,
);
let thread_hdls = vec![t_retransmit, t_window];
Self { thread_hdls }
let thread_hdls = vec![t_retransmit];
Self {
thread_hdls,
window_service,
}
}
}
@ -170,6 +172,7 @@ impl Service for RetransmitStage {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
self.window_service.join()?;
Ok(())
}
}

View File

@ -1,7 +1,7 @@
// Implementation of RpcRequestHandler trait for testing Rpc requests without i/o
use crate::rpc_request::{RpcRequest, RpcRequestHandler};
use serde_json::{self, Number, Value};
use serde_json::{Number, Value};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};

View File

@ -82,7 +82,7 @@ impl Tpu {
transactions_sockets: Vec<UdpSocket>,
broadcast_socket: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
entry_height: u64,
blob_index: u64,
sigverify_disabled: bool,
max_tick_height: u64,
last_entry_id: &Hash,
@ -119,7 +119,7 @@ impl Tpu {
bank.clone(),
broadcast_socket,
cluster_info,
entry_height,
blob_index,
bank.leader_scheduler.clone(),
entry_receiver,
max_tick_height,
@ -174,7 +174,7 @@ impl Tpu {
cluster_info: Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool,
max_tick_height: u64,
entry_height: u64,
blob_index: u64,
last_entry_id: &Hash,
leader_id: Pubkey,
to_validator_sender: &TpuRotationSender,
@ -215,7 +215,7 @@ impl Tpu {
bank.clone(),
broadcast_socket,
cluster_info,
entry_height,
blob_index,
bank.leader_scheduler.clone(),
entry_receiver,
max_tick_height,

View File

@ -32,7 +32,7 @@ use std::thread;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType {
LeaderRotation(u64, u64, Hash),
LeaderRotation(u64, Hash),
}
pub type TvuRotationSender = Sender<TvuReturnType>;
@ -45,7 +45,6 @@ pub struct Tvu {
storage_stage: StorageStage,
exit: Arc<AtomicBool>,
last_entry_id: Arc<RwLock<Hash>>,
entry_height: Arc<RwLock<u64>>,
}
pub struct Sockets {
@ -60,6 +59,7 @@ impl Tvu {
/// # Arguments
/// * `bank` - The bank state.
/// * `entry_height` - Initial ledger height
/// * `blob_index` - Index of last processed blob
/// * `last_entry_id` - Hash of the last entry
/// * `cluster_info` - The cluster_info state.
/// * `sockets` - My fetch, repair, and restransmit sockets
@ -68,6 +68,7 @@ impl Tvu {
pub fn new(
voting_keypair: Option<Arc<VotingKeypair>>,
bank: &Arc<Bank>,
blob_index: u64,
entry_height: u64,
last_entry_id: Hash,
cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -110,7 +111,6 @@ impl Tvu {
db_ledger.clone(),
&cluster_info,
bank.tick_height(),
entry_height,
Arc::new(retransmit_socket),
repair_socket,
blob_fetch_receiver,
@ -118,7 +118,6 @@ impl Tvu {
exit.clone(),
);
let l_entry_height = Arc::new(RwLock::new(entry_height));
let l_last_entry_id = Arc::new(RwLock::new(last_entry_id));
let (replay_stage, ledger_entry_receiver) = ReplayStage::new(
@ -128,7 +127,7 @@ impl Tvu {
bank.clone(),
cluster_info.clone(),
exit.clone(),
l_entry_height.clone(),
blob_index,
l_last_entry_id.clone(),
to_leader_sender,
entry_stream,
@ -155,17 +154,13 @@ impl Tvu {
storage_stage,
exit,
last_entry_id: l_last_entry_id,
entry_height: l_entry_height,
},
blob_fetch_sender,
)
}
pub fn get_state(&self) -> (Hash, u64) {
(
*self.last_entry_id.read().unwrap(),
*self.entry_height.read().unwrap(),
)
pub fn get_state(&self) -> Hash {
*self.last_entry_id.read().unwrap()
}
pub fn is_exited(&self) -> bool {
@ -260,6 +255,7 @@ pub mod tests {
Some(Arc::new(voting_keypair)),
&bank,
0,
0,
cur_hash,
&cref1,
{
@ -346,6 +342,7 @@ pub mod tests {
Some(Arc::new(voting_keypair)),
&bank,
0,
0,
cur_hash,
&cref1,
{

View File

@ -1,14 +1,8 @@
//! The `window` module defines data structure for storing the tail of the ledger.
//!
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::SharedBlob;
use log::Level;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
#[derive(Default, Clone)]
@ -46,18 +40,6 @@ pub trait WindowUtil {
fn window_size(&self) -> u64;
fn repair(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey,
times: usize,
consumed: u64,
received: u64,
tick_height: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<(SocketAddr, Vec<u8>)>;
fn print(&self, id: &Pubkey, consumed: u64) -> String;
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool;
@ -116,86 +98,6 @@ impl WindowUtil for Window {
self.len() as u64
}
fn repair(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
id: &Pubkey,
times: usize,
consumed: u64,
received: u64,
tick_height: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<(SocketAddr, Vec<u8>)> {
let rcluster_info = cluster_info.read().unwrap();
// Check if we are the next next slot leader
let is_next_leader = {
let leader_scheduler = leader_scheduler_option.read().unwrap();
let next_slot = leader_scheduler.tick_height_to_slot(tick_height) + 1;
match leader_scheduler.get_leader_for_slot(next_slot) {
Some(leader_id) if leader_id == *id => true,
// In the case that we are not in the current scope of the leader schedule
// window then either:
//
// 1) The replay stage hasn't caught up to the "consumed" entries we sent,
// in which case it will eventually catch up
//
// 2) We are on the border between ticks_per_epochs, so the
// schedule won't be known until the entry on that cusp is received
// by the replay stage (which comes after this stage). Hence, the next
// leader at the beginning of that next epoch will not know they are the
// leader until they receive that last "cusp" entry. The leader also won't ask for repairs
// for that entry because "is_next_leader" won't be set here. In this case,
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
// until the leader hits "times" >= the max times in calculate_max_repair().
// The impact of this, along with the similar problem from broadcast for the transitioning
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
None => false,
_ => false,
}
};
let num_peers = rcluster_info.repair_peers().len() as u64;
let max_repair = if max_entry_height == 0 {
calculate_max_repair(
num_peers,
consumed,
received,
times,
is_next_leader,
self.window_size(),
)
} else {
max_entry_height + 1
};
let idxs = self.clear_slots(consumed, max_repair);
let reqs: Vec<_> = idxs
.into_iter()
.filter_map(|pix| rcluster_info.window_index_request(pix).ok())
.collect();
drop(rcluster_info);
inc_new_counter_info!("streamer-repair_window-repair", reqs.len());
if log_enabled!(Level::Trace) {
trace!(
"{}: repair_window counter times: {} consumed: {} received: {} max_repair: {} missing: {}",
id,
times,
consumed,
received,
max_repair,
reqs.len()
);
for (to, _) in &reqs {
trace!("{}: repair_window request to {}", id, to);
}
}
reqs
}
fn print(&self, id: &Pubkey, consumed: u64) -> String {
let pointer: Vec<_> = self
.iter()

View File

@ -4,12 +4,12 @@ use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::db_ledger::DbLedger;
use crate::db_window::*;
use crate::leader_scheduler::LeaderScheduler;
use crate::repair_service::RepairService;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::{BlobReceiver, BlobSender};
use log::Level;
use rand::{thread_rng, Rng};
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
@ -17,7 +17,7 @@ use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
pub const MAX_REPAIR_BACKOFF: usize = 128;
@ -27,27 +27,6 @@ pub enum WindowServiceReturnType {
LeaderRotation(u64),
}
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
//exponential backoff
if *last != consumed {
//start with a 50% chance of asking for repairs
*times = 1;
}
*last = consumed;
*times += 1;
// Experiment with capping repair request duration.
// Once nodes are too far behind they can spend many
// seconds without asking for repair
if *times > MAX_REPAIR_BACKOFF {
// 50% chance that a request will fire between 64 - 128 tries
*times = MAX_REPAIR_BACKOFF / 2;
}
//if we get lucky, make the request, which should exponentially get less likely
thread_rng().gen_range(0, *times as u64) == 0
}
#[allow(clippy::too_many_arguments)]
fn recv_window(
db_ledger: &Arc<DbLedger>,
@ -108,99 +87,98 @@ fn recv_window(
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn window_service(
db_ledger: Arc<DbLedger>,
cluster_info: Arc<RwLock<ClusterInfo>>,
tick_height: u64,
entry_height: u64,
max_entry_height: u64,
r: BlobReceiver,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut tick_height_ = tick_height;
let mut last = entry_height;
let mut times = 0;
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = recv_window(
&db_ledger,
&id,
&leader_scheduler,
&mut tick_height_,
max_entry_height,
&r,
&retransmit,
&done,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
// Implement a destructor for the window_service thread to signal it exited
// even on panics
struct Finalizer {
exit_sender: Arc<AtomicBool>,
}
impl Finalizer {
fn new(exit_sender: Arc<AtomicBool>) -> Self {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
self.exit_sender.clone().store(true, Ordering::Relaxed);
}
}
pub struct WindowService {
t_window: JoinHandle<()>,
repair_service: RepairService,
}
impl WindowService {
#[allow(clippy::too_many_arguments)]
pub fn new(
db_ledger: Arc<DbLedger>,
cluster_info: Arc<RwLock<ClusterInfo>>,
tick_height: u64,
max_entry_height: u64,
r: BlobReceiver,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
exit: Arc<AtomicBool>,
) -> WindowService {
let exit_ = exit.clone();
let repair_service = RepairService::new(
db_ledger.clone(),
exit.clone(),
repair_socket,
cluster_info.clone(),
);
let t_window = Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_);
let mut tick_height_ = tick_height;
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
loop {
if exit.load(Ordering::Relaxed) {
break;
}
}
let meta = db_ledger.meta();
if let Ok(Some(meta)) = meta {
let received = meta.received;
let consumed = meta.consumed;
// Consumed should never be bigger than received
assert!(consumed <= received);
if received == consumed {
trace!(
"{} we have everything received: {} consumed: {}",
id,
received,
consumed
);
continue;
}
//exponential backoff
if !repair_backoff(&mut last, &mut times, consumed) {
trace!("{} !repair_backoff() times = {}", id, times);
continue;
}
trace!("{} let's repair! times = {}", id, times);
let reqs = repair(
if let Err(e) = recv_window(
&db_ledger,
&cluster_info,
&id,
times,
tick_height_,
max_entry_height,
&leader_scheduler,
);
if let Ok(reqs) = reqs {
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
&mut tick_height_,
max_entry_height,
&r,
&retransmit,
&done,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
}
}
}
}
})
.unwrap()
})
.unwrap();
WindowService {
t_window,
repair_service,
}
}
}
impl Service for WindowService {
type JoinReturnType = ();
fn join(self) -> thread::Result<()> {
self.t_window.join()?;
self.repair_service.join()
}
}
#[cfg(test)]
@ -210,9 +188,9 @@ mod test {
use crate::db_ledger::DbLedger;
use crate::entry::make_consecutive_blobs;
use crate::leader_scheduler::LeaderScheduler;
use crate::service::Service;
use crate::streamer::{blob_receiver, responder};
use crate::window_service::{repair_backoff, window_service};
use crate::window_service::WindowService;
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
@ -246,12 +224,11 @@ mod test {
);
let mut leader_schedule = LeaderScheduler::default();
leader_schedule.set_leader_schedule(vec![me_id]);
let t_window = window_service(
let t_window = WindowService::new(
db_ledger,
subs,
0,
0,
0,
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
@ -328,12 +305,11 @@ mod test {
);
let mut leader_schedule = LeaderScheduler::default();
leader_schedule.set_leader_schedule(vec![me_id]);
let t_window = window_service(
let t_window = WindowService::new(
db_ledger,
subs.clone(),
0,
0,
0,
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
@ -382,33 +358,4 @@ mod test {
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&db_ledger_path);
}
#[test]
pub fn test_repair_backoff() {
let num_tests = 100;
let res: usize = (0..num_tests)
.map(|_| {
let mut last = 0;
let mut times = 0;
let total: usize = (0..127)
.map(|x| {
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
assert_eq!(times, x + 2);
rv
})
.sum();
assert_eq!(times, 128);
assert_eq!(last, 1);
repair_backoff(&mut last, &mut times, 1);
assert_eq!(times, 64);
repair_backoff(&mut last, &mut times, 2);
assert_eq!(times, 2);
assert_eq!(last, 2);
total
})
.sum();
let avg = res / num_tests;
assert!(avg >= 3);
assert!(avg <= 5);
}
}

View File

@ -3,7 +3,7 @@ use solana::blob_fetch_stage::BlobFetchStage;
use solana::client::mk_client;
use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::db_ledger::{create_tmp_sample_ledger, tmp_copy_ledger};
use solana::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
use solana::db_ledger::{DbLedger, DbLedgerConfig, DEFAULT_SLOT_HEIGHT};
use solana::entry::{reconstruct_entries_from_blobs, Entry};
use solana::fullnode::{new_bank_from_ledger, Fullnode, FullnodeConfig, FullnodeReturnType};
use solana::gossip_service::GossipService;
@ -728,10 +728,11 @@ fn test_multi_node_dynamic_network() {
let mut ledger_paths = Vec::new();
ledger_paths.push(genesis_ledger_path.clone());
let leader_ledger_path = tmp_copy_ledger(&genesis_ledger_path, "multi_node_dynamic_network");
let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.info.clone();
let leader_ledger_path = tmp_copy_ledger(&genesis_ledger_path, "multi_node_dynamic_network");
ledger_paths.push(leader_ledger_path.clone());
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
let server = Fullnode::new(
@ -926,14 +927,14 @@ fn test_leader_to_validator_transition() {
let mut fullnode_config = FullnodeConfig::default();
let ticks_per_slot = 5;
fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new(
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
1,
// Setup window length to exclude the genesis bootstrap leader vote at tick height 0, so
// that when the leader schedule is recomputed for epoch 1 only the validator vote at tick
// height 1 will be considered.
ticks_per_slot,
);
));
// Initialize the leader ledger. Make a mint and a genesis entry
// in the leader ledger
@ -1007,7 +1008,12 @@ fn test_leader_to_validator_transition() {
leader_exit();
info!("Check the ledger to make sure it's the right height...");
let bank = new_bank_from_ledger(&leader_ledger_path, &LeaderSchedulerConfig::default()).0;
let bank = new_bank_from_ledger(
&leader_ledger_path,
DbLedgerConfig::default(),
&LeaderSchedulerConfig::default(),
)
.0;
assert_eq!(
bank.tick_height(),
@ -1072,11 +1078,11 @@ fn test_leader_validator_basic() {
// Create the leader scheduler config
let mut fullnode_config = FullnodeConfig::default();
let ticks_per_slot = 5;
fullnode_config.leader_scheduler_config = LeaderSchedulerConfig::new(
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
1, // 1 slot per epoch
ticks_per_slot,
);
));
// Start the validator node
let voting_keypair = VotingKeypair::new_local(&validator_keypair);
@ -1171,8 +1177,11 @@ fn test_dropped_handoff_recovery() {
let ticks_per_slot = 5;
let ticks_per_epoch = slots_per_epoch * ticks_per_slot;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, ticks_per_epoch);
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
slots_per_epoch,
ticks_per_epoch,
));
// Make a common mint and a genesis entry for both leader + validator's ledgers
let num_ending_ticks = 1;
@ -1385,12 +1394,16 @@ fn test_full_leader_validator_network() {
let ticks_per_slot = 5;
let ticks_per_epoch = slots_per_epoch * ticks_per_slot;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, ticks_per_epoch);
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
slots_per_epoch,
ticks_per_epoch * 3,
));
let mut nodes = vec![];
info!("Start up the validators");
// Start up the validators
for kp in node_keypairs.into_iter() {
let validator_ledger_path = tmp_copy_ledger(
&bootstrap_leader_ledger_path,
@ -1579,8 +1592,11 @@ fn test_broadcast_last_tick() {
let bootstrap_leader_keypair = Arc::new(bootstrap_leader_keypair);
let voting_keypair = VotingKeypair::new_local(&bootstrap_leader_keypair);
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, ticks_per_epoch);
fullnode_config.set_leader_scheduler_config(LeaderSchedulerConfig::new(
ticks_per_slot,
slots_per_epoch,
ticks_per_epoch,
));
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_keypair,

View File

@ -9,7 +9,9 @@ use bincode::deserialize;
use solana::client::mk_client;
use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::db_ledger::DbLedger;
use solana::db_ledger::{create_tmp_sample_ledger, get_tmp_ledger_path, tmp_copy_ledger};
use solana::db_ledger::{
create_tmp_sample_ledger, get_tmp_ledger_path, tmp_copy_ledger, DEFAULT_SLOT_HEIGHT,
};
use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeConfig};
use solana::replicator::Replicator;
@ -148,7 +150,7 @@ fn test_replicator_startup_basic() {
let cluster_info = ClusterInfo::new(tn.info.clone());
let repair_index = replicator.entry_height();
let req = cluster_info
.window_index_request_bytes(repair_index)
.window_index_request_bytes(DEFAULT_SLOT_HEIGHT, repair_index)
.unwrap();
let exit = Arc::new(AtomicBool::new(false));