verify that blobs match a known leader for the slot (#3927)

* validate that blobs match a known leader for the slot

* clippy
This commit is contained in:
Rob Walker 2019-04-22 15:21:10 -07:00 committed by GitHub
parent b27b515186
commit 4b04c37c36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 103 additions and 21 deletions

View File

@ -21,7 +21,6 @@ use solana_client::rpc_client::RpcClient;
use solana_client::rpc_request::RpcRequest;
use solana_client::thin_client::{create_client, ThinClient};
use solana_sdk::client::{AsyncClient, SyncClient};
use solana_sdk::hash::{Hash, Hasher};
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction;
@ -235,6 +234,7 @@ impl Replicator {
let (retransmit_sender, retransmit_receiver) = channel();
let window_service = WindowService::new(
None, //TODO: need a way to validate blobs... https://github.com/solana-labs/solana/issues/3924
blocktree.clone(),
cluster_info.clone(),
blob_fetch_receiver,

View File

@ -71,6 +71,7 @@ fn retransmitter(
cluster_info: Arc<RwLock<ClusterInfo>>,
r: BlobReceiver,
) -> JoinHandle<()> {
let bank_forks = bank_forks.clone();
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
@ -99,7 +100,7 @@ pub struct RetransmitStage {
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
pub fn new(
bank_forks: &Arc<RwLock<BankForks>>,
bank_forks: Arc<RwLock<BankForks>>,
blocktree: Arc<Blocktree>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
retransmit_socket: Arc<UdpSocket>,
@ -116,6 +117,7 @@ impl RetransmitStage {
retransmit_receiver,
);
let window_service = WindowService::new(
Some(bank_forks),
blocktree,
cluster_info.clone(),
fetch_stage_receiver,

View File

@ -102,7 +102,7 @@ impl Tvu {
//the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let retransmit_stage = RetransmitStage::new(
&bank_forks,
bank_forks.clone(),
blocktree.clone(),
&cluster_info,
Arc::new(retransmit_socket),

View File

@ -1,15 +1,17 @@
//! `window_service` handles the data plane incoming blobs, storing them in
//! blocktree and retransmitting where required
//!
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::packet::{SharedBlob, BLOB_HEADER_SIZE};
use crate::leader_schedule_utils::slot_leader_at;
use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use crate::repair_service::{RepairService, RepairSlotRange};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::{BlobReceiver, BlobSender};
use solana_metrics::counter::Counter;
use solana_metrics::{influxdb, submit};
use solana_runtime::bank::Bank;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
@ -67,33 +69,56 @@ fn process_blobs(blobs: &[SharedBlob], blocktree: &Arc<Blocktree>) -> Result<()>
Ok(())
}
/// drop blobs that are from myself or not from the correct leader for the
/// blob's slot
fn should_retransmit_and_persist(blob: &Blob, bank: Option<&Arc<Bank>>, my_id: &Pubkey) -> bool {
let slot_leader_id = bank.and_then(|bank| slot_leader_at(blob.slot(), &bank));
if blob.id() == *my_id {
inc_new_counter_info!("streamer-recv_window-circular_transmission", 1);
false
} else if slot_leader_id == None {
inc_new_counter_info!("streamer-recv_window-unknown_leader", 1);
true
} else if slot_leader_id != Some(blob.id()) {
inc_new_counter_info!("streamer-recv_window-wrong_leader", 1);
false
} else {
true
}
}
fn recv_window(
bank_forks: Option<&Arc<RwLock<BankForks>>>,
blocktree: &Arc<Blocktree>,
id: &Pubkey,
my_id: &Pubkey,
r: &BlobReceiver,
retransmit: &BlobSender,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
let mut blobs = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
while let Ok(mut blob) = r.try_recv() {
blobs.append(&mut blob)
}
let now = Instant::now();
inc_new_counter_info!("streamer-recv_window-recv", dq.len(), 100);
inc_new_counter_info!("streamer-recv_window-recv", blobs.len());
submit(
influxdb::Point::new("recv-window")
.add_field("count", influxdb::Value::Integer(dq.len() as i64))
.to_owned(),
);
blobs.retain(|blob| {
should_retransmit_and_persist(
&blob.read().unwrap(),
bank_forks
.map(|bank_forks| bank_forks.read().unwrap().working_bank())
.as_ref(),
my_id,
)
});
retransmit_blobs(&dq, retransmit, id)?;
retransmit_blobs(&blobs, retransmit, my_id)?;
//send a contiguous set of blocks
trace!("{} num blobs received: {}", id, dq.len());
trace!("{} num blobs received: {}", my_id, blobs.len());
process_blobs(&dq, blocktree)?;
process_blobs(&blobs, blocktree)?;
trace!(
"Elapsed processing time in recv_window(): {}",
@ -128,6 +153,7 @@ pub struct WindowService {
impl WindowService {
pub fn new(
bank_forks: Option<Arc<RwLock<BankForks>>>,
blocktree: Arc<Blocktree>,
cluster_info: Arc<RwLock<ClusterInfo>>,
r: BlobReceiver,
@ -144,6 +170,7 @@ impl WindowService {
repair_slot_range,
);
let exit = exit.clone();
let bank_forks = bank_forks.clone();
let t_window = Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -154,7 +181,9 @@ impl WindowService {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit) {
if let Err(e) =
recv_window(bank_forks.as_ref(), &blocktree, &id, &r, &retransmit)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -187,12 +216,15 @@ impl Service for WindowService {
#[cfg(test)]
mod test {
use super::*;
use crate::bank_forks::BankForks;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice};
use crate::packet::index_blobs;
use crate::packet::{index_blobs, Blob};
use crate::service::Service;
use crate::streamer::{blob_receiver, responder};
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
@ -224,6 +256,46 @@ mod test {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_should_retransmit_and_persist() {
let me_id = Pubkey::new_rand();
let leader_id = Pubkey::new_rand();
let bank = Arc::new(Bank::new(
&GenesisBlock::new_with_leader(100, &leader_id, 10).0,
));
let mut blob = Blob::default();
blob.set_id(&leader_id);
// without a Bank and blobs not from me, blob continues
assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), true);
// with a Bank for slot 0, blob continues
assert_eq!(
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
true
);
// set the blob to have come from the wrong leader
blob.set_id(&Pubkey::new_rand());
assert_eq!(
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
false
);
// with a Bank and no idea who leader is, we keep the blobs (for now)
// TODO: persistr in blocktree that we didn't know who the leader was at the time?
blob.set_slot(100);
assert_eq!(
should_retransmit_and_persist(&blob, Some(&bank), &me_id),
true
);
// if the blob came back from me, it doesn't continue, whether or not I have a bank
blob.set_id(&me_id);
assert_eq!(should_retransmit_and_persist(&blob, None, &me_id), false);
}
#[test]
pub fn window_send_test() {
solana_logger::setup();
@ -244,6 +316,10 @@ mod test {
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
);
let t_window = WindowService::new(
Some(Arc::new(RwLock::new(BankForks::new(
0,
Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0),
)))),
blocktree,
subs,
r_reader,
@ -316,6 +392,10 @@ mod test {
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
);
let t_window = WindowService::new(
Some(Arc::new(RwLock::new(BankForks::new(
0,
Bank::new(&GenesisBlock::new_with_leader(100, &me_id, 10).0),
)))),
blocktree,
subs.clone(),
r_reader,