diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 19e988b509..d9e98c186e 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -21,7 +21,6 @@ use solana_client::rpc_client::RpcClient; use solana_client::rpc_request::RpcRequest; use solana_client::thin_client::{create_client, ThinClient}; use solana_sdk::client::{AsyncClient, SyncClient}; - use solana_sdk::hash::{Hash, Hasher}; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction; @@ -235,6 +234,7 @@ impl Replicator { let (retransmit_sender, retransmit_receiver) = channel(); let window_service = WindowService::new( + None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924 blocktree.clone(), cluster_info.clone(), blob_fetch_receiver, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 676a096ebd..99103311df 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -71,6 +71,7 @@ fn retransmitter( cluster_info: Arc>, r: BlobReceiver, ) -> JoinHandle<()> { + let bank_forks = bank_forks.clone(); Builder::new() .name("solana-retransmitter".to_string()) .spawn(move || { @@ -99,7 +100,7 @@ pub struct RetransmitStage { impl RetransmitStage { #[allow(clippy::new_ret_no_self)] pub fn new( - bank_forks: &Arc>, + bank_forks: Arc>, blocktree: Arc, cluster_info: &Arc>, retransmit_socket: Arc, @@ -116,6 +117,7 @@ impl RetransmitStage { retransmit_receiver, ); let window_service = WindowService::new( + Some(bank_forks), blocktree, cluster_info.clone(), fetch_stage_receiver, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 2df8235227..2bf0e30427 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -102,7 +102,7 @@ impl Tvu { //the packets coming out of blob_receiver need to be sent to the GPU and verified //then sent to the window, which does the erasure coding reconstruction let retransmit_stage = RetransmitStage::new( - &bank_forks, + bank_forks.clone(), blocktree.clone(), &cluster_info, Arc::new(retransmit_socket), diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 872a6d7bc2..a7557a73fc 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,15 +1,17 @@ //! `window_service` handles the data plane incoming blobs, storing them in //! blocktree and retransmitting where required //! +use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; -use crate::packet::{SharedBlob, BLOB_HEADER_SIZE}; +use crate::leader_schedule_utils::slot_leader_at; +use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::repair_service::{RepairService, RepairSlotRange}; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{BlobReceiver, BlobSender}; use solana_metrics::counter::Counter; -use solana_metrics::{influxdb, submit}; +use solana_runtime::bank::Bank; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::duration_as_ms; use std::net::UdpSocket; @@ -67,33 +69,56 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc) -> Result<()> Ok(()) } +/// drop blobs that are from myself or not from the correct leader for the +/// blob's slot +fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc>, my_id: &Pubkey) -> bool { + let slot_leader_id = bank.and_then(|bank| slot_leader_at(blob.slot(), &bank)); + + if blob.id() == *my_id { + inc_new_counter_info!("streamer-recv_window-circular_transmission", 1); + false + } else if slot_leader_id == None { + inc_new_counter_info!("streamer-recv_window-unknown_leader", 1); + true + } else if slot_leader_id != Some(blob.id()) { + inc_new_counter_info!("streamer-recv_window-wrong_leader", 1); + false + } else { + true + } +} + fn recv_window( + bank_forks: Option<&Arc>>, blocktree: &Arc, - id: &Pubkey, + my_id: &Pubkey, r: &BlobReceiver, retransmit: &BlobSender, ) -> Result<()> { let timer = Duration::from_millis(200); - let mut dq = r.recv_timeout(timer)?; + let mut blobs = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { - dq.append(&mut nq) + while let Ok(mut blob) = r.try_recv() { + blobs.append(&mut blob) } let now = Instant::now(); - inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100); + inc_new_counter_info!("streamer-recv_window-recv", blobs.len()); - submit( - influxdb::Point::new("recv-window") - .add_field("count", influxdb::Value::Integer(dq.len() as i64)) - .to_owned(), - ); + blobs.retain(|blob| { + should_retransmit_and_persist( + &blob.read().unwrap(), + bank_forks + .map(|bank_forks| bank_forks.read().unwrap().working_bank()) + .as_ref(), + my_id, + ) + }); - retransmit_blobs(&dq, retransmit, id)?; + retransmit_blobs(&blobs, retransmit, my_id)?; - //send a contiguous set of blocks - trace!("{} num blobs received: {}", id, dq.len()); + trace!("{} num blobs received: {}", my_id, blobs.len()); - process_blobs(&dq, blocktree)?; + process_blobs(&blobs, blocktree)?; trace!( "Elapsed processing time in recv_window(): {}", @@ -128,6 +153,7 @@ pub struct WindowService { impl WindowService { pub fn new( + bank_forks: Option>>, blocktree: Arc, cluster_info: Arc>, r: BlobReceiver, @@ -144,6 +170,7 @@ impl WindowService { repair_slot_range, ); let exit = exit.clone(); + let bank_forks = bank_forks.clone(); let t_window = Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -154,7 +181,9 @@ impl WindowService { if exit.load(Ordering::Relaxed) { break; } - if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit) { + if let Err(e) = + recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), @@ -187,12 +216,15 @@ impl Service for WindowService { #[cfg(test)] mod test { use super::*; + use crate::bank_forks::BankForks; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice}; - use crate::packet::index_blobs; + use crate::packet::{index_blobs, Blob}; use crate::service::Service; use crate::streamer::{blob_receiver, responder}; + use solana_runtime::bank::Bank; + use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use std::fs::remove_dir_all; use std::net::UdpSocket; @@ -224,6 +256,46 @@ mod test { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + #[test] + fn test_should_retransmit_and_persist() { + let me_id = Pubkey::new_rand(); + let leader_id = Pubkey::new_rand(); + let bank = Arc::new(Bank::new( + &GenesisBlock::new_with_leader(100, &leader_id, 10).0, + )); + + let mut blob = Blob::default(); + blob.set_id(&leader_id); + + // without a Bank and blobs not from me, blob continues + assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), true); + + // with a Bank for slot 0, blob continues + assert_eq!( + should_retransmit_and_persist(&blob, Some(&bank), &me_id), + true + ); + + // set the blob to have come from the wrong leader + blob.set_id(&Pubkey::new_rand()); + assert_eq!( + should_retransmit_and_persist(&blob, Some(&bank), &me_id), + false + ); + + // with a Bank and no idea who leader is, we keep the blobs (for now) + // TODO: persistr in blocktree that we didn't know who the leader was at the time? + blob.set_slot(100); + assert_eq!( + should_retransmit_and_persist(&blob, Some(&bank), &me_id), + true + ); + + // if the blob came back from me, it doesn't continue, whether or not I have a bank + blob.set_id(&me_id); + assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), false); + } + #[test] pub fn window_send_test() { solana_logger::setup(); @@ -244,6 +316,10 @@ mod test { Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), ); let t_window = WindowService::new( + Some(Arc::new(RwLock::new(BankForks::new( + 0, + Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0), + )))), blocktree, subs, r_reader, @@ -316,6 +392,10 @@ mod test { Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"), ); let t_window = WindowService::new( + Some(Arc::new(RwLock::new(BankForks::new( + 0, + Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0), + )))), blocktree, subs.clone(), r_reader,