diff --git a/src/ledger.rs b/src/ledger.rs index 6e4c02cea1..8f97588200 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -8,7 +8,6 @@ use packet::{self, SharedBlob, BLOB_SIZE}; use rayon::prelude::*; use std::collections::VecDeque; use std::io::Cursor; -use std::sync::Arc; use transaction::Transaction; // a Block is a slice of Entries @@ -42,10 +41,7 @@ impl Block for [Entry] { } } -pub fn reconstruct_entries_from_blobs( - blobs: VecDeque, - blob_recycler: &packet::BlobRecycler, -) -> bincode::Result> { +pub fn reconstruct_entries_from_blobs(blobs: VecDeque) -> bincode::Result> { let mut entries: Vec = Vec::with_capacity(blobs.len()); for blob in blobs { @@ -53,11 +49,6 @@ pub fn reconstruct_entries_from_blobs( let msg = blob.read().unwrap(); deserialize(&msg.data()[..msg.meta.size]) }; - // if erasure is enabled, the window may hold a reference to the blob - // to be able to perform erasure decoding for missing blobs - if Arc::strong_count(&blob) == 1 { - blob_recycler.recycle(blob); - } match entry { Ok(entry) => entries.push(entry), @@ -156,10 +147,7 @@ mod tests { let mut blob_q = VecDeque::new(); entries.to_blobs(&blob_recycler, &mut blob_q); - assert_eq!( - reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(), - entries - ); + assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries); } #[test] @@ -167,7 +155,7 @@ mod tests { let blob_recycler = BlobRecycler::default(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000); let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack! - assert!(reconstruct_entries_from_blobs(blobs_q, &blob_recycler).is_err()); + assert!(reconstruct_entries_from_blobs(blobs_q).is_err()); } #[test] @@ -232,10 +220,7 @@ mod bench { bencher.iter(|| { let mut blob_q = VecDeque::new(); entries.to_blobs(&blob_recycler, &mut blob_q); - assert_eq!( - reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(), - entries - ); + assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries); }); } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index d23d57be2b..b51481727a 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -2,7 +2,6 @@ use bank::Bank; use ledger; -use packet::BlobRecycler; use result::Result; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -16,15 +15,11 @@ pub struct ReplicateStage { impl ReplicateStage { /// Process entry blobs, already in order - fn replicate_requests( - bank: &Arc, - blob_receiver: &BlobReceiver, - blob_recycler: &BlobRecycler, - ) -> Result<()> { + fn replicate_requests(bank: &Arc, blob_receiver: &BlobReceiver) -> Result<()> { let timer = Duration::new(1, 0); let blobs = blob_receiver.recv_timeout(timer)?; let blobs_len = blobs.len(); - let entries = ledger::reconstruct_entries_from_blobs(blobs, &blob_recycler)?; + let entries = ledger::reconstruct_entries_from_blobs(blobs)?; let res = bank.process_entries(entries); if res.is_err() { error!("process_entries {} {:?}", blobs_len, res); @@ -33,16 +28,11 @@ impl ReplicateStage { Ok(()) } - pub fn new( - bank: Arc, - exit: Arc, - window_receiver: BlobReceiver, - blob_recycler: BlobRecycler, - ) -> Self { + pub fn new(bank: Arc, exit: Arc, window_receiver: BlobReceiver) -> Self { let thread_hdl = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || loop { - let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler); + let e = Self::replicate_requests(&bank, &window_receiver); if e.is_err() && exit.load(Ordering::Relaxed) { break; } diff --git a/src/tvu.rs b/src/tvu.rs index bcbec8397c..025a8bfa47 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -91,8 +91,7 @@ impl Tvu { fetch_stage.blob_receiver, ); - let replicate_stage = - ReplicateStage::new(bank, exit, window_stage.blob_receiver, blob_recycler); + let replicate_stage = ReplicateStage::new(bank, exit, window_stage.blob_receiver); let mut threads = vec![replicate_stage.thread_hdl]; threads.extend(fetch_stage.thread_hdls.into_iter());