Don't recycle in the replicate stage
Windowing stage owns all the blobs now
This commit is contained in:
parent
799b249f02
commit
1c9e7dbc45
|
@ -8,7 +8,6 @@ use packet::{self, SharedBlob, BLOB_SIZE};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::sync::Arc;
|
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
|
||||||
// a Block is a slice of Entries
|
// a Block is a slice of Entries
|
||||||
|
@ -42,10 +41,7 @@ impl Block for [Entry] {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reconstruct_entries_from_blobs(
|
pub fn reconstruct_entries_from_blobs(blobs: VecDeque<SharedBlob>) -> bincode::Result<Vec<Entry>> {
|
||||||
blobs: VecDeque<SharedBlob>,
|
|
||||||
blob_recycler: &packet::BlobRecycler,
|
|
||||||
) -> bincode::Result<Vec<Entry>> {
|
|
||||||
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
|
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
|
||||||
|
|
||||||
for blob in blobs {
|
for blob in blobs {
|
||||||
|
@ -53,11 +49,6 @@ pub fn reconstruct_entries_from_blobs(
|
||||||
let msg = blob.read().unwrap();
|
let msg = blob.read().unwrap();
|
||||||
deserialize(&msg.data()[..msg.meta.size])
|
deserialize(&msg.data()[..msg.meta.size])
|
||||||
};
|
};
|
||||||
// if erasure is enabled, the window may hold a reference to the blob
|
|
||||||
// to be able to perform erasure decoding for missing blobs
|
|
||||||
if Arc::strong_count(&blob) == 1 {
|
|
||||||
blob_recycler.recycle(blob);
|
|
||||||
}
|
|
||||||
|
|
||||||
match entry {
|
match entry {
|
||||||
Ok(entry) => entries.push(entry),
|
Ok(entry) => entries.push(entry),
|
||||||
|
@ -156,10 +147,7 @@ mod tests {
|
||||||
let mut blob_q = VecDeque::new();
|
let mut blob_q = VecDeque::new();
|
||||||
entries.to_blobs(&blob_recycler, &mut blob_q);
|
entries.to_blobs(&blob_recycler, &mut blob_q);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
|
||||||
reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(),
|
|
||||||
entries
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -167,7 +155,7 @@ mod tests {
|
||||||
let blob_recycler = BlobRecycler::default();
|
let blob_recycler = BlobRecycler::default();
|
||||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
|
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
|
||||||
let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack!
|
let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack!
|
||||||
assert!(reconstruct_entries_from_blobs(blobs_q, &blob_recycler).is_err());
|
assert!(reconstruct_entries_from_blobs(blobs_q).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -232,10 +220,7 @@ mod bench {
|
||||||
bencher.iter(|| {
|
bencher.iter(|| {
|
||||||
let mut blob_q = VecDeque::new();
|
let mut blob_q = VecDeque::new();
|
||||||
entries.to_blobs(&blob_recycler, &mut blob_q);
|
entries.to_blobs(&blob_recycler, &mut blob_q);
|
||||||
assert_eq!(
|
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
|
||||||
reconstruct_entries_from_blobs(blob_q, &blob_recycler).unwrap(),
|
|
||||||
entries
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
use bank::Bank;
|
use bank::Bank;
|
||||||
use ledger;
|
use ledger;
|
||||||
use packet::BlobRecycler;
|
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -16,15 +15,11 @@ pub struct ReplicateStage {
|
||||||
|
|
||||||
impl ReplicateStage {
|
impl ReplicateStage {
|
||||||
/// Process entry blobs, already in order
|
/// Process entry blobs, already in order
|
||||||
fn replicate_requests(
|
fn replicate_requests(bank: &Arc<Bank>, blob_receiver: &BlobReceiver) -> Result<()> {
|
||||||
bank: &Arc<Bank>,
|
|
||||||
blob_receiver: &BlobReceiver,
|
|
||||||
blob_recycler: &BlobRecycler,
|
|
||||||
) -> Result<()> {
|
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let blobs = blob_receiver.recv_timeout(timer)?;
|
let blobs = blob_receiver.recv_timeout(timer)?;
|
||||||
let blobs_len = blobs.len();
|
let blobs_len = blobs.len();
|
||||||
let entries = ledger::reconstruct_entries_from_blobs(blobs, &blob_recycler)?;
|
let entries = ledger::reconstruct_entries_from_blobs(blobs)?;
|
||||||
let res = bank.process_entries(entries);
|
let res = bank.process_entries(entries);
|
||||||
if res.is_err() {
|
if res.is_err() {
|
||||||
error!("process_entries {} {:?}", blobs_len, res);
|
error!("process_entries {} {:?}", blobs_len, res);
|
||||||
|
@ -33,16 +28,11 @@ impl ReplicateStage {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(bank: Arc<Bank>, exit: Arc<AtomicBool>, window_receiver: BlobReceiver) -> Self {
|
||||||
bank: Arc<Bank>,
|
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
window_receiver: BlobReceiver,
|
|
||||||
blob_recycler: BlobRecycler,
|
|
||||||
) -> Self {
|
|
||||||
let thread_hdl = Builder::new()
|
let thread_hdl = Builder::new()
|
||||||
.name("solana-replicate-stage".to_string())
|
.name("solana-replicate-stage".to_string())
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
let e = Self::replicate_requests(&bank, &window_receiver, &blob_recycler);
|
let e = Self::replicate_requests(&bank, &window_receiver);
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,8 +91,7 @@ impl Tvu {
|
||||||
fetch_stage.blob_receiver,
|
fetch_stage.blob_receiver,
|
||||||
);
|
);
|
||||||
|
|
||||||
let replicate_stage =
|
let replicate_stage = ReplicateStage::new(bank, exit, window_stage.blob_receiver);
|
||||||
ReplicateStage::new(bank, exit, window_stage.blob_receiver, blob_recycler);
|
|
||||||
|
|
||||||
let mut threads = vec![replicate_stage.thread_hdl];
|
let mut threads = vec![replicate_stage.thread_hdl];
|
||||||
threads.extend(fetch_stage.thread_hdls.into_iter());
|
threads.extend(fetch_stage.thread_hdls.into_iter());
|
||||||
|
|
Loading…
Reference in New Issue