Ensure forwarded Blobs don't break Erasure (#3907)
This commit is contained in:
parent
c0bffb56df
commit
349e8a9462
|
@ -1197,7 +1197,6 @@ where
|
||||||
.map(|(idx, entry)| {
|
.map(|(idx, entry)| {
|
||||||
let mut b = entry.borrow().to_blob();
|
let mut b = entry.borrow().to_blob();
|
||||||
b.set_index(idx as u64);
|
b.set_index(idx as u64);
|
||||||
b.forward(true);
|
|
||||||
b.set_id(&keypair.pubkey());
|
b.set_id(&keypair.pubkey());
|
||||||
b.set_slot(0);
|
b.set_slot(0);
|
||||||
b
|
b
|
||||||
|
|
|
@ -647,13 +647,17 @@ impl ClusterInfo {
|
||||||
peers: &[ContactInfo],
|
peers: &[ContactInfo],
|
||||||
blob: &SharedBlob,
|
blob: &SharedBlob,
|
||||||
s: &UdpSocket,
|
s: &UdpSocket,
|
||||||
|
forwarded: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (me, orders): (ContactInfo, &[ContactInfo]) = {
|
let (me, orders): (ContactInfo, &[ContactInfo]) = {
|
||||||
// copy to avoid locking during IO
|
// copy to avoid locking during IO
|
||||||
let s = obj.read().unwrap();
|
let s = obj.read().unwrap();
|
||||||
(s.my_data().clone(), peers)
|
(s.my_data().clone(), peers)
|
||||||
};
|
};
|
||||||
let rblob = blob.read().unwrap();
|
// hold a write lock so no one modifies the blob until we send it
|
||||||
|
let mut wblob = blob.write().unwrap();
|
||||||
|
let was_forwarded = !wblob.should_forward();
|
||||||
|
wblob.set_forwarded(forwarded);
|
||||||
trace!("retransmit orders {}", orders.len());
|
trace!("retransmit orders {}", orders.len());
|
||||||
let errs: Vec<_> = orders
|
let errs: Vec<_> = orders
|
||||||
.par_iter()
|
.par_iter()
|
||||||
|
@ -661,15 +665,17 @@ impl ClusterInfo {
|
||||||
debug!(
|
debug!(
|
||||||
"{}: retransmit blob {} to {} {}",
|
"{}: retransmit blob {} to {} {}",
|
||||||
me.id,
|
me.id,
|
||||||
rblob.index(),
|
wblob.index(),
|
||||||
v.id,
|
v.id,
|
||||||
v.tvu,
|
v.tvu,
|
||||||
);
|
);
|
||||||
//TODO profile this, may need multiple sockets for par_iter
|
//TODO profile this, may need multiple sockets for par_iter
|
||||||
assert!(rblob.meta.size <= BLOB_SIZE);
|
assert!(wblob.meta.size <= BLOB_SIZE);
|
||||||
s.send_to(&rblob.data[..rblob.meta.size], &v.tvu)
|
s.send_to(&wblob.data[..wblob.meta.size], &v.tvu)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
// reset the blob to its old state. This avoids us having to copy the blob to modify it
|
||||||
|
wblob.set_forwarded(was_forwarded);
|
||||||
for e in errs {
|
for e in errs {
|
||||||
if let Err(e) = &e {
|
if let Err(e) = &e {
|
||||||
inc_new_counter_info!("cluster_info-retransmit-send_to_error", 1, 1);
|
inc_new_counter_info!("cluster_info-retransmit-send_to_error", 1, 1);
|
||||||
|
@ -683,9 +689,14 @@ impl ClusterInfo {
|
||||||
/// retransmit messages from the leader to layer 1 nodes
|
/// retransmit messages from the leader to layer 1 nodes
|
||||||
/// # Remarks
|
/// # Remarks
|
||||||
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
||||||
pub fn retransmit(obj: &Arc<RwLock<Self>>, blob: &SharedBlob, s: &UdpSocket) -> Result<()> {
|
pub fn retransmit(
|
||||||
|
obj: &Arc<RwLock<Self>>,
|
||||||
|
blob: &SharedBlob,
|
||||||
|
s: &UdpSocket,
|
||||||
|
forwarded: bool,
|
||||||
|
) -> Result<()> {
|
||||||
let peers = obj.read().unwrap().retransmit_peers();
|
let peers = obj.read().unwrap().retransmit_peers();
|
||||||
ClusterInfo::retransmit_to(obj, &peers, blob, s)
|
ClusterInfo::retransmit_to(obj, &peers, blob, s, forwarded)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_orders(
|
fn send_orders(
|
||||||
|
|
|
@ -415,7 +415,6 @@ pub fn make_consecutive_blobs(
|
||||||
let mut blob = blob.write().unwrap();
|
let mut blob = blob.write().unwrap();
|
||||||
blob.set_index(index);
|
blob.set_index(index);
|
||||||
blob.set_id(id);
|
blob.set_id(id);
|
||||||
blob.forward(true);
|
|
||||||
blob.meta.set_addr(addr);
|
blob.meta.set_addr(addr);
|
||||||
index += 1;
|
index += 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,13 +225,11 @@ impl CodingGenerator {
|
||||||
let index = data_blob.index();
|
let index = data_blob.index();
|
||||||
let slot = data_blob.slot();
|
let slot = data_blob.slot();
|
||||||
let id = data_blob.id();
|
let id = data_blob.id();
|
||||||
let should_forward = data_blob.should_forward();
|
|
||||||
|
|
||||||
let mut coding_blob = Blob::default();
|
let mut coding_blob = Blob::default();
|
||||||
coding_blob.set_index(index);
|
coding_blob.set_index(index);
|
||||||
coding_blob.set_slot(slot);
|
coding_blob.set_slot(slot);
|
||||||
coding_blob.set_id(&id);
|
coding_blob.set_id(&id);
|
||||||
coding_blob.forward(should_forward);
|
|
||||||
coding_blob.set_size(max_data_size);
|
coding_blob.set_size(max_data_size);
|
||||||
coding_blob.set_coding();
|
coding_blob.set_coding();
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
pub struct Meta {
|
pub struct Meta {
|
||||||
pub size: usize,
|
pub size: usize,
|
||||||
pub num_retransmits: u64,
|
pub forward: bool,
|
||||||
pub addr: [u16; 8],
|
pub addr: [u16; 8],
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
pub v6: bool,
|
pub v6: bool,
|
||||||
|
@ -353,8 +353,8 @@ const PARENT_RANGE: std::ops::Range<usize> = range!(0, u64);
|
||||||
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
|
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
|
||||||
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
|
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
|
||||||
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
|
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
|
||||||
const FORWARD_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, bool);
|
const FORWARDED_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, bool);
|
||||||
const FLAGS_RANGE: std::ops::Range<usize> = range!(FORWARD_RANGE.end, u32);
|
const FLAGS_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u32);
|
||||||
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
|
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
|
||||||
|
|
||||||
macro_rules! align {
|
macro_rules! align {
|
||||||
|
@ -427,11 +427,12 @@ impl Blob {
|
||||||
/// A bool is used here instead of a flag because this item is not intended to be signed when
|
/// A bool is used here instead of a flag because this item is not intended to be signed when
|
||||||
/// blob signatures are introduced
|
/// blob signatures are introduced
|
||||||
pub fn should_forward(&self) -> bool {
|
pub fn should_forward(&self) -> bool {
|
||||||
self.data[FORWARD_RANGE][0] & 0x1 == 1
|
self.data[FORWARDED_RANGE][0] & 0x1 == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn forward(&mut self, forward: bool) {
|
/// Mark this blob's forwarded status
|
||||||
self.data[FORWARD_RANGE][0] = u8::from(forward)
|
pub fn set_forwarded(&mut self, forward: bool) {
|
||||||
|
self.data[FORWARDED_RANGE][0] = u8::from(forward)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn flags(&self) -> u32 {
|
pub fn flags(&self) -> u32 {
|
||||||
|
@ -581,7 +582,6 @@ pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot:
|
||||||
blob.set_slot(slot);
|
blob.set_slot(slot);
|
||||||
blob.set_parent(parent);
|
blob.set_parent(parent);
|
||||||
blob.set_id(id);
|
blob.set_id(id);
|
||||||
blob.forward(true);
|
|
||||||
blob_index += 1;
|
blob_index += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -708,9 +708,9 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_blob_forward() {
|
fn test_blob_forward() {
|
||||||
let mut b = Blob::default();
|
let mut b = Blob::default();
|
||||||
assert!(!b.should_forward());
|
|
||||||
b.forward(true);
|
|
||||||
assert!(b.should_forward());
|
assert!(b.should_forward());
|
||||||
|
b.set_forwarded(true);
|
||||||
|
assert!(!b.should_forward());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
@ -5,7 +5,6 @@ use crate::blocktree::Blocktree;
|
||||||
use crate::cluster_info::{
|
use crate::cluster_info::{
|
||||||
compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
|
compute_retransmit_peers, ClusterInfo, GROW_LAYER_CAPACITY, NEIGHBORHOOD_SIZE,
|
||||||
};
|
};
|
||||||
use crate::packet::SharedBlob;
|
|
||||||
use crate::result::{Error, Result};
|
use crate::result::{Error, Result};
|
||||||
use crate::service::Service;
|
use crate::service::Service;
|
||||||
use crate::staking_utils;
|
use crate::staking_utils;
|
||||||
|
@ -48,24 +47,16 @@ fn retransmit(
|
||||||
GROW_LAYER_CAPACITY,
|
GROW_LAYER_CAPACITY,
|
||||||
);
|
);
|
||||||
for b in &dq {
|
for b in &dq {
|
||||||
if b.read().unwrap().should_forward() {
|
if b.read().unwrap().meta.forward {
|
||||||
ClusterInfo::retransmit_to(&cluster_info, &neighbors, ©_for_neighbors(b), sock)?;
|
ClusterInfo::retransmit_to(&cluster_info, &neighbors, b, sock, true)?;
|
||||||
|
ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, false)?;
|
||||||
|
} else {
|
||||||
|
ClusterInfo::retransmit_to(&cluster_info, &children, b, sock, true)?;
|
||||||
}
|
}
|
||||||
// Always send blobs to children
|
|
||||||
ClusterInfo::retransmit_to(&cluster_info, &children, b, sock)?;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Modifies a blob for neighbors nodes
|
|
||||||
#[inline]
|
|
||||||
fn copy_for_neighbors(b: &SharedBlob) -> SharedBlob {
|
|
||||||
let mut blob = b.read().unwrap().clone();
|
|
||||||
// Disable blob forwarding for neighbors
|
|
||||||
blob.forward(false);
|
|
||||||
Arc::new(RwLock::new(blob))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
|
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
|
||||||
/// See `cluster_info` for network layer definitions.
|
/// See `cluster_info` for network layer definitions.
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
|
@ -153,17 +144,3 @@ impl Service for RetransmitStage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
// Test that blobs always come out with forward unset for neighbors
|
|
||||||
#[test]
|
|
||||||
fn test_blob_for_neighbors() {
|
|
||||||
let blob = SharedBlob::default();
|
|
||||||
blob.write().unwrap().forward(true);
|
|
||||||
let for_hoodies = copy_for_neighbors(&blob);
|
|
||||||
assert!(!for_hoodies.read().unwrap().should_forward());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,6 +24,9 @@ fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey)
|
||||||
for blob in blobs {
|
for blob in blobs {
|
||||||
// Don't add blobs generated by this node to the retransmit queue
|
// Don't add blobs generated by this node to the retransmit queue
|
||||||
if blob.read().unwrap().id() != *id {
|
if blob.read().unwrap().id() != *id {
|
||||||
|
let mut w_blob = blob.write().unwrap();
|
||||||
|
w_blob.meta.forward = w_blob.should_forward();
|
||||||
|
w_blob.set_forwarded(false);
|
||||||
retransmit_queue.push(blob.clone());
|
retransmit_queue.push(blob.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,7 +176,7 @@ pub fn cluster_info_retransmit() -> result::Result<()> {
|
||||||
assert!(done);
|
assert!(done);
|
||||||
let b = SharedBlob::default();
|
let b = SharedBlob::default();
|
||||||
b.write().unwrap().meta.size = 10;
|
b.write().unwrap().meta.size = 10;
|
||||||
ClusterInfo::retransmit(&c1, &b, &tn1)?;
|
ClusterInfo::retransmit(&c1, &b, &tn1, false)?;
|
||||||
let res: Vec<_> = [tn1, tn2, tn3]
|
let res: Vec<_> = [tn1, tn2, tn3]
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|s| {
|
.map(|s| {
|
||||||
|
|
Loading…
Reference in New Issue