solana/src/window_service.rs

452 lines
15 KiB
Rust
Raw Normal View History

2018-09-07 15:00:26 -07:00
//! The `window_service` provides a thread for maintaining a window (tail of the ledger).
//!
2018-12-07 19:16:27 -08:00
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT};
use crate::db_window::*;
use crate::entry::EntrySender;
use crate::leader_scheduler::LeaderScheduler;
use crate::result::{Error, Result};
use crate::streamer::{BlobReceiver, BlobSender};
2018-09-07 15:00:26 -07:00
use log::Level;
use rand::{thread_rng, Rng};
2018-11-16 08:45:59 -08:00
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
2018-11-16 08:45:59 -08:00
use solana_sdk::timing::duration_as_ms;
use std::borrow::Borrow;
2018-09-07 15:00:26 -07:00
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize};
2018-09-07 15:00:26 -07:00
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
pub const MAX_REPAIR_BACKOFF: usize = 128;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum WindowServiceReturnType {
LeaderRotation(u64),
}
2018-09-07 15:00:26 -07:00
fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
//exponential backoff
if *last != consumed {
//start with a 50% chance of asking for repairs
*times = 1;
}
*last = consumed;
*times += 1;
// Experiment with capping repair request duration.
// Once nodes are too far behind they can spend many
// seconds without asking for repair
if *times > MAX_REPAIR_BACKOFF {
// 50% chance that a request will fire between 64 - 128 tries
*times = MAX_REPAIR_BACKOFF / 2;
}
//if we get lucky, make the request, which should exponentially get less likely
thread_rng().gen_range(0, *times as u64) == 0
}
#[allow(clippy::too_many_arguments)]
2018-09-07 15:00:26 -07:00
fn recv_window(
db_ledger: &Arc<RwLock<DbLedger>>,
2018-09-07 15:00:26 -07:00
id: &Pubkey,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
tick_height: &mut u64,
max_ix: u64,
2018-09-07 15:00:26 -07:00
r: &BlobReceiver,
s: &EntrySender,
2018-09-07 15:00:26 -07:00
retransmit: &BlobSender,
2018-09-27 13:49:50 -07:00
done: &Arc<AtomicBool>,
2018-09-07 15:00:26 -07:00
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
2018-09-07 15:00:26 -07:00
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
2018-11-16 08:45:59 -08:00
submit(
influxdb::Point::new("recv-window")
.add_field("count", influxdb::Value::Integer(dq.len() as i64))
.to_owned(),
);
retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?;
2018-09-07 15:00:26 -07:00
let mut pixs = Vec::new();
//send a contiguous set of blocks
let mut consume_queue = Vec::new();
trace!("{} num blobs received: {}", id, dq.len());
2018-09-07 15:00:26 -07:00
for b in dq {
let (pix, meta_size) = {
let p = b.read().unwrap();
(p.index()?, p.meta.size)
2018-09-07 15:00:26 -07:00
};
submit(
influxdb::Point::new("window-service")
.add_field("last-recv", influxdb::Value::Integer(pix as i64))
.to_owned(),
);
pixs.push(pix);
2018-09-07 15:00:26 -07:00
trace!("{} window pix: {} size: {}", id, pix, meta_size);
let _ = process_blob(
leader_scheduler,
db_ledger,
&b,
max_ix,
2018-09-07 15:00:26 -07:00
pix,
&mut consume_queue,
tick_height,
done,
2018-09-07 15:00:26 -07:00
);
}
trace!(
"Elapsed processing time in recv_window(): {}",
duration_as_ms(&now.elapsed())
);
2018-09-07 15:00:26 -07:00
if !consume_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
2018-09-07 15:00:26 -07:00
pub fn window_service(
db_ledger: Arc<RwLock<DbLedger>>,
2018-10-08 19:55:54 -07:00
cluster_info: Arc<RwLock<ClusterInfo>>,
tick_height: u64,
2018-09-07 15:00:26 -07:00
entry_height: u64,
max_entry_height: u64,
2018-09-07 15:00:26 -07:00
r: BlobReceiver,
s: EntrySender,
2018-09-07 15:00:26 -07:00
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
) -> JoinHandle<()> {
2018-09-07 15:00:26 -07:00
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut tick_height_ = tick_height;
2018-09-07 15:00:26 -07:00
let mut last = entry_height;
let mut times = 0;
let id = cluster_info.read().unwrap().id();
2018-09-07 15:00:26 -07:00
trace!("{}: RECV_WINDOW started", id);
loop {
if let Err(e) = recv_window(
&db_ledger,
2018-09-07 15:00:26 -07:00
&id,
&leader_scheduler,
&mut tick_height_,
max_entry_height,
2018-09-07 15:00:26 -07:00
&r,
&s,
&retransmit,
2018-09-27 13:49:50 -07:00
&done,
2018-09-07 15:00:26 -07:00
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-window-error", 1, 1);
error!("window error: {:?}", e);
}
}
}
let meta = {
let rlock = db_ledger.read().unwrap();
rlock
.meta_cf
.get(&rlock.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
};
if let Ok(Some(meta)) = meta {
let received = meta.received;
let consumed = meta.consumed;
submit(
influxdb::Point::new("window-stage")
.add_field("consumed", influxdb::Value::Integer(consumed as i64))
.to_owned(),
);
2018-09-07 15:00:26 -07:00
// Consumed should never be bigger than received
assert!(consumed <= received);
if received == consumed {
trace!(
"{} we have everything received: {} consumed: {}",
id,
received,
consumed
);
continue;
}
2018-09-07 15:00:26 -07:00
//exponential backoff
if !repair_backoff(&mut last, &mut times, consumed) {
trace!("{} !repair_backoff() times = {}", id, times);
continue;
}
trace!("{} let's repair! times = {}", id, times);
let reqs = repair(
db_ledger.read().unwrap().borrow(),
&cluster_info,
&id,
times,
tick_height_,
max_entry_height,
&leader_scheduler,
);
if let Ok(reqs) = reqs {
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
}
2018-09-07 15:00:26 -07:00
}
}
})
.unwrap()
2018-09-07 15:00:26 -07:00
}
#[cfg(test)]
mod test {
2018-12-07 19:16:27 -08:00
use crate::cluster_info::{ClusterInfo, Node};
use crate::db_ledger::DbLedger;
use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler;
use crate::ledger::get_tmp_ledger_path;
use crate::logger;
use crate::packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{blob_receiver, responder};
use crate::window_service::{repair_backoff, window_service};
use rocksdb::{Options, DB};
2018-11-16 08:04:46 -08:00
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
2018-09-07 15:00:26 -07:00
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
2018-09-07 15:00:26 -07:00
use std::sync::{Arc, RwLock};
use std::time::Duration;
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
2018-09-07 15:00:26 -07:00
for _t in 0..5 {
let timer = Duration::new(1, 0);
match r.recv_timeout(timer) {
Ok(m) => {
*num += m.len();
}
e => info!("error {:?}", e),
}
if *num == 10 {
break;
}
}
}
#[test]
pub fn window_send_test() {
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut cluster_info_me = ClusterInfo::new(tn.info.clone());
2018-10-08 19:55:54 -07:00
let me_id = cluster_info_me.my_data().id;
cluster_info_me.set_leader(me_id);
let subs = Arc::new(RwLock::new(cluster_info_me));
2018-09-07 15:00:26 -07:00
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
2018-09-07 15:00:26 -07:00
let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_test");
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
));
2018-09-07 15:00:26 -07:00
let t_window = window_service(
db_ledger,
2018-09-07 15:00:26 -07:00
subs,
0,
0,
0,
2018-09-07 15:00:26 -07:00
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
2018-09-07 15:00:26 -07:00
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let num_blobs_to_make = 10;
let gossip_address = &tn.info.gossip;
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
let msgs = make_consecutive_blobs(
me_id,
num_blobs_to_make,
0,
Hash::default(),
&gossip_address,
)
.into_iter()
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
.rev()
.collect();;
2018-09-07 15:00:26 -07:00
s_responder.send(msgs).expect("send");
t_responder
};
let mut num = 0;
get_entries(r_window, &mut num);
2018-09-07 15:00:26 -07:00
assert_eq!(num, 10);
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.try_recv() {
q.append(&mut nq);
}
assert_eq!(q.len(), 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&db_ledger_path);
2018-09-07 15:00:26 -07:00
}
#[test]
pub fn window_send_leader_test2() {
2018-09-07 15:00:26 -07:00
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let cluster_info_me = ClusterInfo::new(tn.info.clone());
2018-10-08 19:55:54 -07:00
let me_id = cluster_info_me.my_data().id;
let subs = Arc::new(RwLock::new(cluster_info_me));
2018-09-07 15:00:26 -07:00
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
2018-09-07 15:00:26 -07:00
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test");
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
));
2018-09-07 15:00:26 -07:00
let t_window = window_service(
db_ledger,
2018-09-07 15:00:26 -07:00
subs.clone(),
0,
0,
0,
2018-09-07 15:00:26 -07:00
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
2018-09-07 15:00:26 -07:00
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
2018-09-07 15:00:26 -07:00
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = SharedBlob::default();
2018-09-07 15:00:26 -07:00
{
let mut w = b.write().unwrap();
2018-09-07 15:00:26 -07:00
w.set_index(i).unwrap();
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
2018-09-07 15:00:26 -07:00
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.gossip);
2018-09-07 15:00:26 -07:00
}
msgs.push(b);
}
s_responder.send(msgs).expect("send");
subs.write().unwrap().set_leader(me_id);
let mut msgs1 = Vec::new();
for v in 1..5 {
let i = 9 + v;
let b = SharedBlob::default();
2018-09-07 15:00:26 -07:00
{
let mut w = b.write().unwrap();
2018-09-07 15:00:26 -07:00
w.set_index(i).unwrap();
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
2018-09-07 15:00:26 -07:00
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.gossip);
2018-09-07 15:00:26 -07:00
}
msgs1.push(b);
}
s_responder.send(msgs1).expect("send");
t_responder
};
let mut q = r_retransmit.recv().unwrap();
while let Ok(mut nq) = r_retransmit.recv_timeout(Duration::from_millis(100)) {
2018-09-07 15:00:26 -07:00
q.append(&mut nq);
}
assert!(q.len() > 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&db_ledger_path);
2018-09-07 15:00:26 -07:00
}
#[test]
pub fn test_repair_backoff() {
let num_tests = 100;
let res: usize = (0..num_tests)
.map(|_| {
let mut last = 0;
let mut times = 0;
let total: usize = (0..127)
.map(|x| {
let rv = repair_backoff(&mut last, &mut times, 1) as usize;
assert_eq!(times, x + 2);
rv
})
.sum();
2018-09-07 15:00:26 -07:00
assert_eq!(times, 128);
assert_eq!(last, 1);
repair_backoff(&mut last, &mut times, 1);
assert_eq!(times, 64);
repair_backoff(&mut last, &mut times, 2);
assert_eq!(times, 2);
assert_eq!(last, 2);
total
})
.sum();
2018-09-07 15:00:26 -07:00
let avg = res / num_tests;
assert!(avg >= 3);
assert!(avg <= 5);
}
}