From 349e8a94624045502a1a03382e732d7b099fdb47 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Sat, 20 Apr 2019 16:44:06 -0700 Subject: [PATCH] Ensure forwarded Blobs don't break Erasure (#3907) --- core/src/blocktree.rs | 1 - core/src/cluster_info.rs | 23 +++++++++++++++++------ core/src/entry.rs | 1 - core/src/erasure.rs | 2 -- core/src/packet.rs | 18 +++++++++--------- core/src/retransmit_stage.rs | 33 +++++---------------------------- core/src/window_service.rs | 3 +++ core/tests/gossip.rs | 2 +- 8 files changed, 35 insertions(+), 48 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 3c24c675e9..0f2235347a 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -1197,7 +1197,6 @@ where .map(|(idx, entry)| { let mut b = entry.borrow().to_blob(); b.set_index(idx as u64); - b.forward(true); b.set_id(&keypair.pubkey()); b.set_slot(0); b diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 08f71ffd08..a9cf89e6d5 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -647,13 +647,17 @@ impl ClusterInfo { peers: &[ContactInfo], blob: &SharedBlob, s: &UdpSocket, + forwarded: bool, ) -> Result<()> { let (me, orders): (ContactInfo, &[ContactInfo]) = { // copy to avoid locking during IO let s = obj.read().unwrap(); (s.my_data().clone(), peers) }; - let rblob = blob.read().unwrap(); + // hold a write lock so no one modifies the blob until we send it + let mut wblob = blob.write().unwrap(); + let was_forwarded = !wblob.should_forward(); + wblob.set_forwarded(forwarded); trace!("retransmit orders {}", orders.len()); let errs: Vec<_> = orders .par_iter() @@ -661,15 +665,17 @@ impl ClusterInfo { debug!( "{}: retransmit blob {} to {} {}", me.id, - rblob.index(), + wblob.index(), v.id, v.tvu, ); //TODO profile this, may need multiple sockets for par_iter - assert!(rblob.meta.size <= BLOB_SIZE); - s.send_to(&rblob.data[..rblob.meta.size], &v.tvu) + assert!(wblob.meta.size <= BLOB_SIZE); + s.send_to(&wblob.data[..wblob.meta.size], &v.tvu) }) .collect(); + // reset the blob to its old state. This avoids us having to copy the blob to modify it + wblob.set_forwarded(was_forwarded); for e in errs { if let Err(e) = &e { inc_new_counter_info!("cluster_info-retransmit-send_to_error", 1, 1); @@ -683,9 +689,14 @@ impl ClusterInfo { /// retransmit messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` - pub fn retransmit(obj: &Arc>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> { + pub fn retransmit( + obj: &Arc>, + blob: &SharedBlob, + s: &UdpSocket, + forwarded: bool, + ) -> Result<()> { let peers = obj.read().unwrap().retransmit_peers(); - ClusterInfo::retransmit_to(obj, &peers, blob, s) + ClusterInfo::retransmit_to(obj, &peers, blob, s, forwarded) } fn send_orders( diff --git a/core/src/entry.rs b/core/src/entry.rs index 499455d3ce..b9d3b114c9 100644 --- a/core/src/entry.rs +++ b/core/src/entry.rs @@ -415,7 +415,6 @@ pub fn make_consecutive_blobs( let mut blob = blob.write().unwrap(); blob.set_index(index); blob.set_id(id); - blob.forward(true); blob.meta.set_addr(addr); index += 1; } diff --git a/core/src/erasure.rs b/core/src/erasure.rs index d79889306f..fefa0b5217 100644 --- a/core/src/erasure.rs +++ b/core/src/erasure.rs @@ -225,13 +225,11 @@ impl CodingGenerator { let index = data_blob.index(); let slot = data_blob.slot(); let id = data_blob.id(); - let should_forward = data_blob.should_forward(); let mut coding_blob = Blob::default(); coding_blob.set_index(index); coding_blob.set_slot(slot); coding_blob.set_id(&id); - coding_blob.forward(should_forward); coding_blob.set_size(max_data_size); coding_blob.set_coding(); diff --git a/core/src/packet.rs b/core/src/packet.rs index 46d8b453f2..d0bedc95d0 100644 --- a/core/src/packet.rs +++ b/core/src/packet.rs @@ -31,7 +31,7 @@ pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE; #[repr(C)] pub struct Meta { pub size: usize, - pub num_retransmits: u64, + pub forward: bool, pub addr: [u16; 8], pub port: u16, pub v6: bool, @@ -353,8 +353,8 @@ const PARENT_RANGE: std::ops::Range = range!(0, u64); const SLOT_RANGE: std::ops::Range = range!(PARENT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range = range!(SLOT_RANGE.end, u64); const ID_RANGE: std::ops::Range = range!(INDEX_RANGE.end, Pubkey); -const FORWARD_RANGE: std::ops::Range = range!(ID_RANGE.end, bool); -const FLAGS_RANGE: std::ops::Range = range!(FORWARD_RANGE.end, u32); +const FORWARDED_RANGE: std::ops::Range = range!(ID_RANGE.end, bool); +const FLAGS_RANGE: std::ops::Range = range!(FORWARDED_RANGE.end, u32); const SIZE_RANGE: std::ops::Range = range!(FLAGS_RANGE.end, u64); macro_rules! align { @@ -427,11 +427,12 @@ impl Blob { /// A bool is used here instead of a flag because this item is not intended to be signed when /// blob signatures are introduced pub fn should_forward(&self) -> bool { - self.data[FORWARD_RANGE][0] & 0x1 == 1 + self.data[FORWARDED_RANGE][0] & 0x1 == 0 } - pub fn forward(&mut self, forward: bool) { - self.data[FORWARD_RANGE][0] = u8::from(forward) + /// Mark this blob's forwarded status + pub fn set_forwarded(&mut self, forward: bool) { + self.data[FORWARDED_RANGE][0] = u8::from(forward) } pub fn flags(&self) -> u32 { @@ -581,7 +582,6 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: blob.set_slot(slot); blob.set_parent(parent); blob.set_id(id); - blob.forward(true); blob_index += 1; } } @@ -708,9 +708,9 @@ mod tests { #[test] fn test_blob_forward() { let mut b = Blob::default(); - assert!(!b.should_forward()); - b.forward(true); assert!(b.should_forward()); + b.set_forwarded(true); + assert!(!b.should_forward()); } #[test] diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 65573975a5..676a096ebd 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -5,7 +5,6 @@ use crate::blocktree::Blocktree; use crate::cluster_info::{ compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE, }; -use crate::packet::SharedBlob; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; @@ -48,24 +47,16 @@ fn retransmit( GROW_LAYER_CAPACITY, ); for b in &dq { - if b.read().unwrap().should_forward() { - ClusterInfo::retransmit_to(&cluster_info, &neighbors, ©_for_neighbors(b), sock)?; + if b.read().unwrap().meta.forward { + ClusterInfo::retransmit_to(&cluster_info, &neighbors, b, sock, true)?; + ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, false)?; + } else { + ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, true)?; } - // Always send blobs to children - ClusterInfo::retransmit_to(&cluster_info, &children, b, sock)?; } Ok(()) } -/// Modifies a blob for neighbors nodes -#[inline] -fn copy_for_neighbors(b: &SharedBlob) -> SharedBlob { - let mut blob = b.read().unwrap().clone(); - // Disable blob forwarding for neighbors - blob.forward(false); - Arc::new(RwLock::new(blob)) -} - /// Service to retransmit messages from the leader or layer 1 to relevant peer nodes. /// See `cluster_info` for network layer definitions. /// # Arguments @@ -153,17 +144,3 @@ impl Service for RetransmitStage { Ok(()) } } - -#[cfg(test)] -mod tests { - use super::*; - - // Test that blobs always come out with forward unset for neighbors - #[test] - fn test_blob_for_neighbors() { - let blob = SharedBlob::default(); - blob.write().unwrap().forward(true); - let for_hoodies = copy_for_neighbors(&blob); - assert!(!for_hoodies.read().unwrap().should_forward()); - } -} diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 11e2aae8e3..872a6d7bc2 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -24,6 +24,9 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) for blob in blobs { // Don't add blobs generated by this node to the retransmit queue if blob.read().unwrap().id() != *id { + let mut w_blob = blob.write().unwrap(); + w_blob.meta.forward = w_blob.should_forward(); + w_blob.set_forwarded(false); retransmit_queue.push(blob.clone()); } } diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 59feef39bb..dd90ac505e 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -176,7 +176,7 @@ pub fn cluster_info_retransmit() -> result::Result<()> { assert!(done); let b = SharedBlob::default(); b.write().unwrap().meta.size = 10; - ClusterInfo::retransmit(&c1, &b, &tn1)?; + ClusterInfo::retransmit(&c1, &b, &tn1, false)?; let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| {