revert revert of kill window (#2427)
* remove window code from most places * window used only for testing * remove unecessary clippy directives
This commit is contained in:
parent
e9116736cd
commit
022a97da99
|
@ -7,12 +7,11 @@ use crate::db_ledger::DbLedger;
|
|||
use crate::entry::Entry;
|
||||
use crate::entry::EntrySlice;
|
||||
#[cfg(feature = "erasure")]
|
||||
use crate::erasure;
|
||||
use crate::erasure::CodingGenerator;
|
||||
use crate::leader_scheduler::LeaderScheduler;
|
||||
use crate::packet::{index_blobs, SharedBlob};
|
||||
use crate::packet::index_blobs;
|
||||
use crate::result::{Error, Result};
|
||||
use crate::service::Service;
|
||||
use crate::window::{SharedWindow, WindowIndex, WindowUtil};
|
||||
use log::Level;
|
||||
use rayon::prelude::*;
|
||||
use solana_metrics::{influxdb, submit};
|
||||
|
@ -32,21 +31,24 @@ pub enum BroadcastServiceReturnType {
|
|||
ExitSignal,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn broadcast(
|
||||
db_ledger: &Arc<DbLedger>,
|
||||
struct Broadcast {
|
||||
id: Pubkey,
|
||||
max_tick_height: Option<u64>,
|
||||
leader_id: Pubkey,
|
||||
node_info: &NodeInfo,
|
||||
blob_index: u64,
|
||||
|
||||
#[cfg(feature = "erasure")]
|
||||
coding_generator: CodingGenerator,
|
||||
}
|
||||
|
||||
impl Broadcast {
|
||||
fn run(
|
||||
&mut self,
|
||||
broadcast_table: &[NodeInfo],
|
||||
window: &SharedWindow,
|
||||
receiver: &Receiver<Vec<Entry>>,
|
||||
sock: &UdpSocket,
|
||||
transmit_index: &mut WindowIndex,
|
||||
receive_index: &mut u64,
|
||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||
) -> Result<()> {
|
||||
let id = node_info.id;
|
||||
db_ledger: &Arc<DbLedger>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let entries = receiver.recv_timeout(timer)?;
|
||||
let now = Instant::now();
|
||||
|
@ -54,117 +56,63 @@ fn broadcast(
|
|||
let mut ventries = Vec::new();
|
||||
ventries.push(entries);
|
||||
|
||||
let mut contains_last_tick = false;
|
||||
while let Ok(entries) = receiver.try_recv() {
|
||||
num_entries += entries.len();
|
||||
ventries.push(entries);
|
||||
}
|
||||
|
||||
let last_tick = match self.max_tick_height {
|
||||
Some(max_tick_height) => {
|
||||
if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
|
||||
contains_last_tick |= Some(last.tick_height) == max_tick_height;
|
||||
last.tick_height == max_tick_height
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
None => false,
|
||||
};
|
||||
|
||||
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
|
||||
|
||||
let to_blobs_start = Instant::now();
|
||||
|
||||
// Generate the slot heights for all the entries inside ventries
|
||||
let slot_heights = generate_slots(&ventries, leader_scheduler);
|
||||
// this may span slots if this leader broadcasts for consecutive slots...
|
||||
let slots = generate_slots(&ventries, leader_scheduler);
|
||||
|
||||
let blobs: Vec<_> = ventries
|
||||
.into_par_iter()
|
||||
.flat_map(|p| p.to_shared_blobs())
|
||||
.collect();
|
||||
|
||||
let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect();
|
||||
// TODO: blob_index should be slot-relative...
|
||||
index_blobs(&blobs, &self.id, self.blob_index, &slots);
|
||||
|
||||
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
|
||||
|
||||
let blobs_chunking = Instant::now();
|
||||
// We could receive more blobs than window slots so
|
||||
// break them up into window-sized chunks to process
|
||||
let window_size = window.read().unwrap().window_size();
|
||||
let blobs_chunked = blobs_slot_heights
|
||||
.chunks(window_size as usize)
|
||||
.map(|x| x.to_vec());
|
||||
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
|
||||
|
||||
let broadcast_start = Instant::now();
|
||||
for blobs in blobs_chunked {
|
||||
let blobs_len = blobs.len();
|
||||
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
|
||||
|
||||
index_blobs(blobs.iter(), &node_info.id, *receive_index);
|
||||
|
||||
// keep the cache of blobs that are broadcast
|
||||
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
|
||||
{
|
||||
let mut win = window.write().unwrap();
|
||||
assert!(blobs.len() <= win.len());
|
||||
let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect();
|
||||
for b in &blobs {
|
||||
let ix = b.read().unwrap().index().expect("blob index");
|
||||
let pos = (ix % window_size) as usize;
|
||||
if let Some(x) = win[pos].data.take() {
|
||||
trace!(
|
||||
"{} popped {} at {}",
|
||||
id,
|
||||
x.read().unwrap().index().unwrap(),
|
||||
pos
|
||||
);
|
||||
}
|
||||
if let Some(x) = win[pos].coding.take() {
|
||||
trace!(
|
||||
"{} popped {} at {}",
|
||||
id,
|
||||
x.read().unwrap().index().unwrap(),
|
||||
pos
|
||||
);
|
||||
}
|
||||
|
||||
trace!("{} null {}", id, pos);
|
||||
}
|
||||
for b in &blobs {
|
||||
{
|
||||
let ix = b.read().unwrap().index().expect("blob index");
|
||||
let pos = (ix % window_size) as usize;
|
||||
trace!("{} caching {} at {}", id, ix, pos);
|
||||
assert!(win[pos].data.is_none());
|
||||
win[pos].data = Some(b.clone());
|
||||
}
|
||||
}
|
||||
|
||||
db_ledger
|
||||
.write_consecutive_blobs(&blobs)
|
||||
.expect("Unrecoverable failure to write to database");
|
||||
}
|
||||
|
||||
// don't count coding blobs in the blob indexes
|
||||
self.blob_index += blobs.len() as u64;
|
||||
|
||||
// Send out data
|
||||
ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?;
|
||||
|
||||
// Fill in the coding blob data from the window data blobs
|
||||
#[cfg(feature = "erasure")]
|
||||
{
|
||||
erasure::generate_coding(
|
||||
&id,
|
||||
&mut window.write().unwrap(),
|
||||
*receive_index,
|
||||
blobs_len,
|
||||
&mut transmit_index.coding,
|
||||
)?;
|
||||
let coding = self.coding_generator.next(&blobs)?;
|
||||
|
||||
// send out erasures
|
||||
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
|
||||
}
|
||||
|
||||
*receive_index += blobs_len as u64;
|
||||
|
||||
// Send blobs out from the window
|
||||
ClusterInfo::broadcast(
|
||||
contains_last_tick,
|
||||
leader_id,
|
||||
&node_info,
|
||||
&broadcast_table,
|
||||
&window,
|
||||
&sock,
|
||||
transmit_index,
|
||||
*receive_index,
|
||||
)?;
|
||||
}
|
||||
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
|
||||
|
||||
inc_new_counter_info!(
|
||||
|
@ -172,20 +120,21 @@ fn broadcast(
|
|||
duration_as_ms(&now.elapsed()) as usize
|
||||
);
|
||||
info!(
|
||||
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
|
||||
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
|
||||
"broadcast: {} entries, blob time {} broadcast time {}",
|
||||
num_entries, to_blobs_elapsed, broadcast_elapsed
|
||||
);
|
||||
|
||||
submit(
|
||||
influxdb::Point::new("broadcast-service")
|
||||
.add_field(
|
||||
"transmit-index",
|
||||
influxdb::Value::Integer(transmit_index.data as i64),
|
||||
influxdb::Value::Integer(self.blob_index as i64),
|
||||
)
|
||||
.to_owned(),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_slots(
|
||||
|
@ -240,46 +189,41 @@ pub struct BroadcastService {
|
|||
}
|
||||
|
||||
impl BroadcastService {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn run(
|
||||
db_ledger: &Arc<DbLedger>,
|
||||
bank: &Arc<Bank>,
|
||||
sock: &UdpSocket,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
window: &SharedWindow,
|
||||
entry_height: u64,
|
||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||
receiver: &Receiver<Vec<Entry>>,
|
||||
max_tick_height: Option<u64>,
|
||||
exit_signal: &Arc<AtomicBool>,
|
||||
) -> BroadcastServiceReturnType {
|
||||
let mut transmit_index = WindowIndex {
|
||||
data: entry_height,
|
||||
coding: entry_height,
|
||||
};
|
||||
let mut receive_index = entry_height;
|
||||
let me = cluster_info.read().unwrap().my_data().clone();
|
||||
|
||||
let mut broadcast = Broadcast {
|
||||
id: me.id,
|
||||
max_tick_height,
|
||||
blob_index: entry_height,
|
||||
#[cfg(feature = "erasure")]
|
||||
coding_generator: CodingGenerator::new(),
|
||||
};
|
||||
|
||||
loop {
|
||||
if exit_signal.load(Ordering::Relaxed) {
|
||||
return BroadcastServiceReturnType::ExitSignal;
|
||||
}
|
||||
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank);
|
||||
// Layer 1 nodes are limited to the fanout size.
|
||||
// Layer 1, leader nodes are limited to the fanout size.
|
||||
broadcast_table.truncate(DATA_PLANE_FANOUT);
|
||||
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
|
||||
let leader_id = cluster_info.read().unwrap().leader_id();
|
||||
if let Err(e) = broadcast(
|
||||
db_ledger,
|
||||
max_tick_height,
|
||||
leader_id,
|
||||
&me,
|
||||
if let Err(e) = broadcast.run(
|
||||
&broadcast_table,
|
||||
&window,
|
||||
&receiver,
|
||||
&sock,
|
||||
&mut transmit_index,
|
||||
&mut receive_index,
|
||||
receiver,
|
||||
sock,
|
||||
leader_scheduler,
|
||||
db_ledger,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
|
||||
|
@ -311,13 +255,11 @@ impl BroadcastService {
|
|||
/// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
|
||||
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
|
||||
/// completing the cycle.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
db_ledger: Arc<DbLedger>,
|
||||
bank: Arc<Bank>,
|
||||
sock: UdpSocket,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
window: SharedWindow,
|
||||
entry_height: u64,
|
||||
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||
receiver: Receiver<Vec<Entry>>,
|
||||
|
@ -334,7 +276,6 @@ impl BroadcastService {
|
|||
&bank,
|
||||
&sock,
|
||||
&cluster_info,
|
||||
&window,
|
||||
entry_height,
|
||||
&leader_scheduler,
|
||||
&receiver,
|
||||
|
@ -364,7 +305,6 @@ mod test {
|
|||
use crate::db_ledger::DbLedger;
|
||||
use crate::entry::create_ticks;
|
||||
use crate::service::Service;
|
||||
use crate::window::new_window;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
@ -401,8 +341,6 @@ mod test {
|
|||
cluster_info.insert_info(broadcast_buddy.info);
|
||||
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||
|
||||
let window = new_window(32 * 1024);
|
||||
let shared_window = Arc::new(RwLock::new(window));
|
||||
let exit_sender = Arc::new(AtomicBool::new(false));
|
||||
let bank = Arc::new(Bank::default());
|
||||
|
||||
|
@ -412,7 +350,6 @@ mod test {
|
|||
bank.clone(),
|
||||
leader_info.sockets.broadcast,
|
||||
cluster_info,
|
||||
shared_window,
|
||||
entry_height,
|
||||
leader_scheduler,
|
||||
entry_receiver,
|
||||
|
|
|
@ -25,7 +25,6 @@ use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
|
|||
use crate::result::Result;
|
||||
use crate::rpc::RPC_PORT;
|
||||
use crate::streamer::{BlobReceiver, BlobSender};
|
||||
use crate::window::{SharedWindow, WindowIndex};
|
||||
use bincode::{deserialize, serialize};
|
||||
use hashbrown::HashMap;
|
||||
use log::Level;
|
||||
|
@ -493,58 +492,33 @@ impl ClusterInfo {
|
|||
|
||||
/// broadcast messages from the leader to layer 1 nodes
|
||||
/// # Remarks
|
||||
/// We need to avoid having obj locked while doing any io, such as the `send_to`
|
||||
pub fn broadcast(
|
||||
id: &Pubkey,
|
||||
contains_last_tick: bool,
|
||||
leader_id: Pubkey,
|
||||
me: &NodeInfo,
|
||||
broadcast_table: &[NodeInfo],
|
||||
window: &SharedWindow,
|
||||
s: &UdpSocket,
|
||||
transmit_index: &mut WindowIndex,
|
||||
received_index: u64,
|
||||
blobs: &[SharedBlob],
|
||||
) -> Result<()> {
|
||||
if broadcast_table.is_empty() {
|
||||
debug!("{}:not enough peers in cluster_info table", me.id);
|
||||
debug!("{}:not enough peers in cluster_info table", id);
|
||||
inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1);
|
||||
Err(ClusterInfoError::NoPeers)?;
|
||||
}
|
||||
trace!(
|
||||
"{} transmit_index: {:?} received_index: {} broadcast_len: {}",
|
||||
me.id,
|
||||
*transmit_index,
|
||||
received_index,
|
||||
broadcast_table.len()
|
||||
);
|
||||
|
||||
let old_transmit_index = transmit_index.data;
|
||||
let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table);
|
||||
|
||||
let orders = Self::create_broadcast_orders(
|
||||
contains_last_tick,
|
||||
window,
|
||||
broadcast_table,
|
||||
transmit_index,
|
||||
received_index,
|
||||
me,
|
||||
);
|
||||
trace!("broadcast orders table {}", orders.len());
|
||||
|
||||
let errs = Self::send_orders(s, orders, me, leader_id);
|
||||
let errs = Self::send_orders(id, s, orders);
|
||||
|
||||
for e in errs {
|
||||
if let Err(e) = &e {
|
||||
trace!("broadcast result {:?}", e);
|
||||
trace!("{}: broadcast result {:?}", id, e);
|
||||
}
|
||||
e?;
|
||||
if transmit_index.data < received_index {
|
||||
transmit_index.data += 1;
|
||||
}
|
||||
}
|
||||
inc_new_counter_info!(
|
||||
"cluster_info-broadcast-max_idx",
|
||||
(transmit_index.data - old_transmit_index) as usize
|
||||
);
|
||||
transmit_index.coding = transmit_index.data;
|
||||
|
||||
inc_new_counter_info!("cluster_info-broadcast-max_idx", blobs.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -603,19 +577,15 @@ impl ClusterInfo {
|
|||
}
|
||||
|
||||
fn send_orders(
|
||||
id: &Pubkey,
|
||||
s: &UdpSocket,
|
||||
orders: Vec<(Option<SharedBlob>, Vec<&NodeInfo>)>,
|
||||
me: &NodeInfo,
|
||||
leader_id: Pubkey,
|
||||
orders: Vec<(SharedBlob, Vec<&NodeInfo>)>,
|
||||
) -> Vec<io::Result<usize>> {
|
||||
orders
|
||||
.into_iter()
|
||||
.flat_map(|(b, vs)| {
|
||||
// only leader should be broadcasting
|
||||
assert!(vs.iter().find(|info| info.id == leader_id).is_none());
|
||||
let bl = b.unwrap();
|
||||
let blob = bl.read().unwrap();
|
||||
//TODO profile this, may need multiple sockets for par_iter
|
||||
let blob = b.read().unwrap();
|
||||
|
||||
let ids_and_tvus = if log_enabled!(Level::Trace) {
|
||||
let v_ids = vs.iter().map(|v| v.id);
|
||||
let tvus = vs.iter().map(|v| v.tvu);
|
||||
|
@ -623,7 +593,7 @@ impl ClusterInfo {
|
|||
|
||||
trace!(
|
||||
"{}: BROADCAST idx: {} sz: {} to {:?} coding: {}",
|
||||
me.id,
|
||||
id,
|
||||
blob.index().unwrap(),
|
||||
blob.meta.size,
|
||||
ids_and_tvus,
|
||||
|
@ -642,7 +612,7 @@ impl ClusterInfo {
|
|||
let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu);
|
||||
trace!(
|
||||
"{}: done broadcast {} to {:?}",
|
||||
me.id,
|
||||
id,
|
||||
blob.meta.size,
|
||||
ids_and_tvus
|
||||
);
|
||||
|
@ -656,70 +626,36 @@ impl ClusterInfo {
|
|||
|
||||
fn create_broadcast_orders<'a>(
|
||||
contains_last_tick: bool,
|
||||
window: &SharedWindow,
|
||||
blobs: &[SharedBlob],
|
||||
broadcast_table: &'a [NodeInfo],
|
||||
transmit_index: &mut WindowIndex,
|
||||
received_index: u64,
|
||||
me: &NodeInfo,
|
||||
) -> Vec<(Option<SharedBlob>, Vec<&'a NodeInfo>)> {
|
||||
) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> {
|
||||
// enumerate all the blobs in the window, those are the indices
|
||||
// transmit them to nodes, starting from a different node.
|
||||
let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize);
|
||||
let window_l = window.read().unwrap();
|
||||
let mut br_idx = transmit_index.data as usize % broadcast_table.len();
|
||||
if blobs.is_empty() {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
for idx in transmit_index.data..received_index {
|
||||
let w_idx = idx as usize % window_l.len();
|
||||
let mut orders = Vec::with_capacity(blobs.len());
|
||||
|
||||
trace!(
|
||||
"{} broadcast order data w_idx {} br_idx {}",
|
||||
me.id,
|
||||
w_idx,
|
||||
br_idx
|
||||
);
|
||||
for (i, blob) in blobs.iter().enumerate() {
|
||||
let br_idx = i % broadcast_table.len();
|
||||
|
||||
trace!("broadcast order data br_idx {}", br_idx);
|
||||
|
||||
orders.push((blob.clone(), vec![&broadcast_table[br_idx]]));
|
||||
}
|
||||
|
||||
if contains_last_tick {
|
||||
// Broadcast the last tick to everyone on the network so it doesn't get dropped
|
||||
// (Need to maximize probability the next leader in line sees this handoff tick
|
||||
// despite packet drops)
|
||||
let target = if idx == received_index - 1 && contains_last_tick {
|
||||
// If we see a tick at max_tick_height, then we know it must be the last
|
||||
// Blob in the window, at index == received_index. There cannot be an entry
|
||||
// that got sent after the last tick, guaranteed by the PohService).
|
||||
assert!(window_l[w_idx].data.is_some());
|
||||
(
|
||||
window_l[w_idx].data.clone(),
|
||||
broadcast_table.iter().collect(),
|
||||
)
|
||||
} else {
|
||||
(window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]])
|
||||
};
|
||||
|
||||
orders.push(target);
|
||||
br_idx += 1;
|
||||
br_idx %= broadcast_table.len();
|
||||
}
|
||||
|
||||
for idx in transmit_index.coding..received_index {
|
||||
let w_idx = idx as usize % window_l.len();
|
||||
|
||||
// skip over empty slots
|
||||
if window_l[w_idx].coding.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
trace!(
|
||||
"{} broadcast order coding w_idx: {} br_idx :{}",
|
||||
me.id,
|
||||
w_idx,
|
||||
br_idx,
|
||||
);
|
||||
|
||||
// If we had a tick at max_tick_height, then we know it must be the last
|
||||
// Blob in the broadcast, There cannot be an entry that got sent after the
|
||||
// last tick, guaranteed by the PohService).
|
||||
orders.push((
|
||||
window_l[w_idx].coding.clone(),
|
||||
vec![&broadcast_table[br_idx]],
|
||||
blobs.last().unwrap().clone(),
|
||||
broadcast_table.iter().collect(),
|
||||
));
|
||||
br_idx += 1;
|
||||
br_idx %= broadcast_table.len();
|
||||
}
|
||||
|
||||
orders
|
||||
|
|
|
@ -976,11 +976,7 @@ mod tests {
|
|||
fn test_read_blobs_bytes() {
|
||||
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
|
||||
let slot = DEFAULT_SLOT_HEIGHT;
|
||||
index_blobs(
|
||||
shared_blobs.iter().zip(vec![slot; 10].into_iter()),
|
||||
&Keypair::new().pubkey(),
|
||||
0,
|
||||
);
|
||||
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, &[slot; 10]);
|
||||
|
||||
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
||||
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
|
||||
|
|
|
@ -562,9 +562,10 @@ mod test {
|
|||
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
|
||||
|
||||
index_blobs(
|
||||
shared_blobs.iter().zip(vec![slot; num_entries].into_iter()),
|
||||
&shared_blobs,
|
||||
&Keypair::new().pubkey(),
|
||||
0,
|
||||
&vec![slot; num_entries],
|
||||
);
|
||||
|
||||
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
|
||||
|
@ -657,11 +658,10 @@ mod test {
|
|||
let shared_blobs = original_entries.clone().to_shared_blobs();
|
||||
|
||||
index_blobs(
|
||||
shared_blobs
|
||||
.iter()
|
||||
.zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()),
|
||||
&shared_blobs,
|
||||
&Keypair::new().pubkey(),
|
||||
0,
|
||||
&vec![DEFAULT_SLOT_HEIGHT; num_entries],
|
||||
);
|
||||
|
||||
let mut consume_queue = vec![];
|
||||
|
|
833
src/erasure.rs
833
src/erasure.rs
|
@ -2,8 +2,6 @@
|
|||
use crate::db_ledger::DbLedger;
|
||||
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
|
||||
use crate::result::{Error, Result};
|
||||
use crate::window::WindowSlot;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::cmp;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
|
@ -177,6 +175,79 @@ pub fn decode_blocks(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn decode_blobs(
|
||||
blobs: &[SharedBlob],
|
||||
erasures: &[i32],
|
||||
size: usize,
|
||||
block_start_idx: u64,
|
||||
slot: u64,
|
||||
) -> Result<bool> {
|
||||
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
|
||||
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
|
||||
|
||||
assert!(blobs.len() == NUM_DATA + NUM_CODING);
|
||||
for b in blobs {
|
||||
locks.push(b.write().unwrap());
|
||||
}
|
||||
|
||||
for (i, l) in locks.iter_mut().enumerate() {
|
||||
if i < NUM_DATA {
|
||||
data_ptrs.push(&mut l.data[..size]);
|
||||
} else {
|
||||
coding_ptrs.push(&mut l.data_mut()[..size]);
|
||||
}
|
||||
}
|
||||
|
||||
// Decode the blocks
|
||||
decode_blocks(
|
||||
data_ptrs.as_mut_slice(),
|
||||
coding_ptrs.as_mut_slice(),
|
||||
&erasures,
|
||||
)?;
|
||||
|
||||
// Create the missing blobs from the reconstructed data
|
||||
let mut corrupt = false;
|
||||
|
||||
for i in &erasures[..erasures.len() - 1] {
|
||||
let n = *i as usize;
|
||||
let mut idx = n as u64 + block_start_idx;
|
||||
|
||||
let mut data_size;
|
||||
if n < NUM_DATA {
|
||||
data_size = locks[n].data_size().unwrap() as usize;
|
||||
data_size -= BLOB_HEADER_SIZE;
|
||||
if data_size > BLOB_DATA_SIZE {
|
||||
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
|
||||
corrupt = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
data_size = size;
|
||||
idx -= NUM_CODING as u64;
|
||||
locks[n].set_slot(slot).unwrap();
|
||||
locks[n].set_index(idx).unwrap();
|
||||
|
||||
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
|
||||
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
|
||||
corrupt = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
locks[n].set_size(data_size);
|
||||
trace!(
|
||||
"erasures[{}] ({}) size: {} data[0]: {}",
|
||||
*i,
|
||||
idx,
|
||||
data_size,
|
||||
locks[n].data()[0]
|
||||
);
|
||||
}
|
||||
|
||||
Ok(corrupt)
|
||||
}
|
||||
|
||||
// Generate coding blocks in window starting from start_idx,
|
||||
// for num_blobs.. For each block place the coding blobs
|
||||
// at the end of the block like so:
|
||||
|
@ -214,13 +285,413 @@ pub fn decode_blocks(
|
|||
//
|
||||
//
|
||||
//
|
||||
pub fn generate_coding(
|
||||
pub struct CodingGenerator {
|
||||
leftover: Vec<SharedBlob>, // SharedBlobs that couldn't be used in last call to next()
|
||||
}
|
||||
|
||||
impl CodingGenerator {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
leftover: Vec::with_capacity(NUM_DATA),
|
||||
}
|
||||
}
|
||||
|
||||
// must be called with consecutive data blobs from previous invocation
|
||||
pub fn next(&mut self, next_data: &[SharedBlob]) -> Result<Vec<SharedBlob>> {
|
||||
let mut next_coding =
|
||||
Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
|
||||
|
||||
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
|
||||
|
||||
for data_blobs in next_data.chunks(NUM_DATA) {
|
||||
if data_blobs.len() < NUM_DATA {
|
||||
self.leftover = data_blobs.to_vec();
|
||||
break;
|
||||
}
|
||||
self.leftover.clear();
|
||||
|
||||
// find max_data_size for the chunk
|
||||
let max_data_size = align!(
|
||||
data_blobs
|
||||
.iter()
|
||||
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)),
|
||||
JERASURE_ALIGN
|
||||
);
|
||||
|
||||
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
|
||||
let data_ptrs: Vec<_> = data_locks
|
||||
.iter()
|
||||
.map(|l| &l.data[..max_data_size])
|
||||
.collect();
|
||||
|
||||
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
|
||||
|
||||
for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
|
||||
let index = data_blob.index().unwrap();
|
||||
let slot = data_blob.slot().unwrap();
|
||||
let id = data_blob.id().unwrap();
|
||||
|
||||
let coding_blob = SharedBlob::default();
|
||||
{
|
||||
let mut coding_blob = coding_blob.write().unwrap();
|
||||
coding_blob.set_index(index).unwrap();
|
||||
coding_blob.set_slot(slot).unwrap();
|
||||
coding_blob.set_id(&id).unwrap();
|
||||
coding_blob.set_size(max_data_size);
|
||||
coding_blob.set_coding().unwrap();
|
||||
}
|
||||
coding_blobs.push(coding_blob);
|
||||
}
|
||||
|
||||
{
|
||||
let mut coding_locks: Vec<_> =
|
||||
coding_blobs.iter().map(|b| b.write().unwrap()).collect();
|
||||
|
||||
let mut coding_ptrs: Vec<_> = coding_locks
|
||||
.iter_mut()
|
||||
.map(|l| &mut l.data_mut()[..max_data_size])
|
||||
.collect();
|
||||
|
||||
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
|
||||
}
|
||||
next_coding.append(&mut coding_blobs);
|
||||
}
|
||||
|
||||
Ok(next_coding)
|
||||
}
|
||||
}
|
||||
|
||||
// Recover the missing data and coding blobs from the input ledger. Returns a vector
|
||||
// of the recovered missing data blobs and a vector of the recovered coding blobs
|
||||
pub fn recover(
|
||||
db_ledger: &Arc<DbLedger>,
|
||||
slot: u64,
|
||||
start_idx: u64,
|
||||
) -> Result<(Vec<SharedBlob>, Vec<SharedBlob>)> {
|
||||
let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
|
||||
|
||||
debug!("block_start_idx: {}", block_start_idx);
|
||||
|
||||
let coding_start_idx = block_start_idx + NUM_DATA as u64 - NUM_CODING as u64;
|
||||
let block_end_idx = block_start_idx + NUM_DATA as u64;
|
||||
trace!(
|
||||
"recover: coding_start_idx: {} block_end_idx: {}",
|
||||
coding_start_idx,
|
||||
block_end_idx
|
||||
);
|
||||
|
||||
let data_missing = db_ledger
|
||||
.find_missing_data_indexes(slot, block_start_idx, block_end_idx, NUM_DATA)
|
||||
.len();
|
||||
let coding_missing = db_ledger
|
||||
.find_missing_coding_indexes(slot, coding_start_idx, block_end_idx, NUM_CODING)
|
||||
.len();
|
||||
|
||||
// if we're not missing data, or if we have too much missing but have enough coding
|
||||
if data_missing == 0 {
|
||||
// nothing to do...
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
||||
if (data_missing + coding_missing) > NUM_CODING {
|
||||
trace!(
|
||||
"recover: start: {} skipping recovery data: {} coding: {}",
|
||||
block_start_idx,
|
||||
data_missing,
|
||||
coding_missing
|
||||
);
|
||||
// nothing to do...
|
||||
return Err(Error::ErasureError(ErasureError::NotEnoughBlocksToDecode));
|
||||
}
|
||||
|
||||
trace!(
|
||||
"recover: recovering: data: {} coding: {}",
|
||||
data_missing,
|
||||
coding_missing
|
||||
);
|
||||
|
||||
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||
let mut erasures: Vec<i32> = Vec::with_capacity(NUM_CODING);
|
||||
|
||||
let mut missing_data: Vec<SharedBlob> = vec![];
|
||||
let mut missing_coding: Vec<SharedBlob> = vec![];
|
||||
|
||||
// Add the data blobs we have into the recovery vector, mark the missing ones
|
||||
for i in block_start_idx..block_end_idx {
|
||||
let result = db_ledger.get_data_blob_bytes(slot, i)?;
|
||||
|
||||
categorize_blob(
|
||||
&result,
|
||||
&mut blobs,
|
||||
&mut missing_data,
|
||||
&mut erasures,
|
||||
(i - block_start_idx) as i32,
|
||||
)?;
|
||||
}
|
||||
|
||||
let mut size = None;
|
||||
// Add the coding blobs we have into the recovery vector, mark the missing ones
|
||||
for i in coding_start_idx..block_end_idx {
|
||||
let result = db_ledger.get_coding_blob_bytes(slot, i)?;
|
||||
|
||||
categorize_blob(
|
||||
&result,
|
||||
&mut blobs,
|
||||
&mut missing_coding,
|
||||
&mut erasures,
|
||||
((i - coding_start_idx) + NUM_DATA as u64) as i32,
|
||||
)?;
|
||||
|
||||
if let Some(b) = result {
|
||||
if size.is_none() {
|
||||
size = Some(b.len() - BLOB_HEADER_SIZE);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Due to checks above verifying that (data_missing + coding_missing) <= NUM_CODING and
|
||||
// data_missing > 0, we know at least one coding block must exist, so "size" can
|
||||
// not remain None after the above processing.
|
||||
let size = size.unwrap();
|
||||
|
||||
// marks end of erasures
|
||||
erasures.push(-1);
|
||||
|
||||
trace!("erasures[]:{:?} data_size: {}", erasures, size,);
|
||||
|
||||
let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx, slot)?;
|
||||
|
||||
if corrupt {
|
||||
// Remove the corrupted coding blobs so there's no effort wasted in trying to
|
||||
// reconstruct the blobs again
|
||||
for i in coding_start_idx..block_end_idx {
|
||||
db_ledger.delete_coding_blob(slot, i)?;
|
||||
}
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
||||
Ok((missing_data, missing_coding))
|
||||
}
|
||||
|
||||
fn categorize_blob(
|
||||
get_blob_result: &Option<Vec<u8>>,
|
||||
blobs: &mut Vec<SharedBlob>,
|
||||
missing: &mut Vec<SharedBlob>,
|
||||
erasures: &mut Vec<i32>,
|
||||
erasure_index: i32,
|
||||
) -> Result<()> {
|
||||
match get_blob_result {
|
||||
Some(b) => {
|
||||
if b.len() <= BLOB_HEADER_SIZE || b.len() > BLOB_SIZE {
|
||||
return Err(Error::ErasureError(ErasureError::InvalidBlobData));
|
||||
}
|
||||
blobs.push(Arc::new(RwLock::new(Blob::new(&b))));
|
||||
}
|
||||
None => {
|
||||
// Mark the missing memory
|
||||
erasures.push(erasure_index);
|
||||
let b = SharedBlob::default();
|
||||
blobs.push(b.clone());
|
||||
missing.push(b);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use crate::db_ledger::get_tmp_ledger_path;
|
||||
use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
|
||||
use crate::entry::{make_tiny_test_entries, EntrySlice};
|
||||
|
||||
use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
|
||||
use crate::window::WindowSlot;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
fn test_coding() {
|
||||
let zero_vec = vec![0; 16];
|
||||
let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
|
||||
let v_orig: Vec<u8> = vs[0].clone();
|
||||
|
||||
let m = 2;
|
||||
let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect();
|
||||
|
||||
{
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
|
||||
|
||||
assert!(generate_coding_blocks(
|
||||
coding_blocks_slices.as_mut_slice(),
|
||||
v_slices.as_slice(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
trace!("coding blocks:");
|
||||
for b in &coding_blocks {
|
||||
trace!("{:?}", b);
|
||||
}
|
||||
let erasure: i32 = 1;
|
||||
let erasures = vec![erasure, -1];
|
||||
// clear an entry
|
||||
vs[erasure as usize].copy_from_slice(zero_vec.as_slice());
|
||||
|
||||
{
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
|
||||
assert!(decode_blocks(
|
||||
v_slices.as_mut_slice(),
|
||||
coding_blocks_slices.as_mut_slice(),
|
||||
erasures.as_slice(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
trace!("vs:");
|
||||
for v in &vs {
|
||||
trace!("{:?}", v);
|
||||
}
|
||||
assert_eq!(v_orig, vs[0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_erasure_generate_coding() {
|
||||
solana_logger::setup();
|
||||
|
||||
// trivial case
|
||||
let mut coding_generator = CodingGenerator::new();
|
||||
let blobs = Vec::new();
|
||||
for _ in 0..NUM_DATA * 2 {
|
||||
let coding = coding_generator.next(&blobs).unwrap();
|
||||
assert_eq!(coding.len(), 0);
|
||||
}
|
||||
|
||||
// test coding by iterating one blob at a time
|
||||
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
|
||||
|
||||
for (i, blob) in data_blobs.iter().cloned().enumerate() {
|
||||
let coding = coding_generator.next(&[blob]).unwrap();
|
||||
|
||||
if !coding.is_empty() {
|
||||
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
|
||||
assert_eq!(coding.len(), NUM_CODING);
|
||||
|
||||
let size = coding[0].read().unwrap().size().unwrap();
|
||||
|
||||
// toss one data and one coding
|
||||
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
|
||||
|
||||
let block_start_idx = i - (i % NUM_DATA);
|
||||
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||
|
||||
blobs.push(SharedBlob::default()); // empty data, erasure at zero
|
||||
for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
|
||||
// skip first blob
|
||||
blobs.push(blob.clone());
|
||||
}
|
||||
blobs.push(SharedBlob::default()); // empty coding, erasure at NUM_DATA
|
||||
for blob in &coding[1..NUM_CODING] {
|
||||
blobs.push(blob.clone());
|
||||
}
|
||||
|
||||
let corrupt =
|
||||
decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
|
||||
|
||||
assert!(!corrupt);
|
||||
|
||||
assert_eq!(
|
||||
blobs[1].read().unwrap().meta,
|
||||
data_blobs[block_start_idx + 1].read().unwrap().meta
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[1].read().unwrap().data(),
|
||||
data_blobs[block_start_idx + 1].read().unwrap().data()
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[0].read().unwrap().meta,
|
||||
data_blobs[block_start_idx].read().unwrap().meta
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[0].read().unwrap().data(),
|
||||
data_blobs[block_start_idx].read().unwrap().data()
|
||||
);
|
||||
assert_eq!(
|
||||
blobs[NUM_DATA].read().unwrap().data(),
|
||||
coding[0].read().unwrap().data()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Temprorary function used in tests to generate a database ledger
|
||||
// from the window (which is used to generate the erasure coding)
|
||||
// until we also transition generate_coding() and BroadcastStage to use DbLedger
|
||||
// Github issue: https://github.com/solana-labs/solana/issues/1899.
|
||||
pub fn generate_db_ledger_from_window(
|
||||
ledger_path: &str,
|
||||
window: &[WindowSlot],
|
||||
use_random: bool,
|
||||
) -> DbLedger {
|
||||
let db_ledger =
|
||||
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
|
||||
for slot in window {
|
||||
if let Some(ref data) = slot.data {
|
||||
// If we're using gibberish blobs, skip validation checks and insert
|
||||
// directly into the ledger
|
||||
if use_random {
|
||||
let data = data.read().unwrap();
|
||||
db_ledger
|
||||
.put_data_blob_bytes(
|
||||
data.slot().unwrap(),
|
||||
data.index().unwrap(),
|
||||
&data.data[..data.data_size().unwrap() as usize],
|
||||
)
|
||||
.expect("Expected successful put into data column of ledger");
|
||||
} else {
|
||||
db_ledger
|
||||
.write_shared_blobs(vec![data].into_iter())
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref coding) = slot.coding {
|
||||
let coding_lock = coding.read().unwrap();
|
||||
|
||||
let index = coding_lock
|
||||
.index()
|
||||
.expect("Expected coding blob to have valid index");
|
||||
|
||||
let data_size = coding_lock
|
||||
.size()
|
||||
.expect("Expected coding blob to have valid data size");
|
||||
|
||||
db_ledger
|
||||
.put_coding_blob_bytes(
|
||||
coding_lock.slot().unwrap(),
|
||||
index,
|
||||
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
db_ledger
|
||||
}
|
||||
|
||||
fn generate_coding(
|
||||
id: &Pubkey,
|
||||
window: &mut [WindowSlot],
|
||||
receive_index: u64,
|
||||
num_blobs: usize,
|
||||
transmit_index_coding: &mut u64,
|
||||
) -> Result<()> {
|
||||
) -> Result<()> {
|
||||
// beginning of the coding blobs of the block that receive_index points into
|
||||
let coding_index_start =
|
||||
receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
|
||||
|
@ -326,7 +797,8 @@ pub fn generate_coding(
|
|||
})
|
||||
.collect();
|
||||
|
||||
let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write().unwrap()).collect();
|
||||
let mut coding_locks: Vec<_> =
|
||||
coding_blobs.iter().map(|b| b.write().unwrap()).collect();
|
||||
|
||||
let mut coding_ptrs: Vec<_> = coding_locks
|
||||
.iter_mut()
|
||||
|
@ -345,322 +817,6 @@ pub fn generate_coding(
|
|||
block_start = block_end;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Recover the missing data and coding blobs from the input ledger. Returns a vector
|
||||
// of the recovered missing data blobs and a vector of the recovered coding blobs
|
||||
pub fn recover(
|
||||
db_ledger: &Arc<DbLedger>,
|
||||
slot: u64,
|
||||
start_idx: u64,
|
||||
) -> Result<(Vec<SharedBlob>, Vec<SharedBlob>)> {
|
||||
let block_start_idx = start_idx - (start_idx % NUM_DATA as u64);
|
||||
|
||||
debug!("block_start_idx: {}", block_start_idx);
|
||||
|
||||
let coding_start_idx = block_start_idx + NUM_DATA as u64 - NUM_CODING as u64;
|
||||
let block_end_idx = block_start_idx + NUM_DATA as u64;
|
||||
trace!(
|
||||
"recover: coding_start_idx: {} block_end_idx: {}",
|
||||
coding_start_idx,
|
||||
block_end_idx
|
||||
);
|
||||
|
||||
let data_missing = db_ledger
|
||||
.find_missing_data_indexes(slot, block_start_idx, block_end_idx, NUM_DATA)
|
||||
.len();
|
||||
let coding_missing = db_ledger
|
||||
.find_missing_coding_indexes(slot, coding_start_idx, block_end_idx, NUM_CODING)
|
||||
.len();
|
||||
|
||||
// if we're not missing data, or if we have too much missing but have enough coding
|
||||
if data_missing == 0 {
|
||||
// nothing to do...
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
||||
if (data_missing + coding_missing) > NUM_CODING {
|
||||
trace!(
|
||||
"recover: start: {} skipping recovery data: {} coding: {}",
|
||||
block_start_idx,
|
||||
data_missing,
|
||||
coding_missing
|
||||
);
|
||||
// nothing to do...
|
||||
return Err(Error::ErasureError(ErasureError::NotEnoughBlocksToDecode));
|
||||
}
|
||||
|
||||
trace!(
|
||||
"recover: recovering: data: {} coding: {}",
|
||||
data_missing,
|
||||
coding_missing
|
||||
);
|
||||
|
||||
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||
let mut erasures: Vec<i32> = Vec::with_capacity(NUM_CODING);
|
||||
|
||||
let mut missing_data: Vec<SharedBlob> = vec![];
|
||||
let mut missing_coding: Vec<SharedBlob> = vec![];
|
||||
let mut size = None;
|
||||
|
||||
// Add the data blobs we have into the recovery vector, mark the missing ones
|
||||
for i in block_start_idx..block_end_idx {
|
||||
let result = db_ledger.get_data_blob_bytes(slot, i)?;
|
||||
|
||||
categorize_blob(
|
||||
&result,
|
||||
&mut blobs,
|
||||
&mut missing_data,
|
||||
&mut erasures,
|
||||
(i - block_start_idx) as i32,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Add the coding blobs we have into the recovery vector, mark the missing ones
|
||||
for i in coding_start_idx..block_end_idx {
|
||||
let result = db_ledger.get_coding_blob_bytes(slot, i)?;
|
||||
|
||||
categorize_blob(
|
||||
&result,
|
||||
&mut blobs,
|
||||
&mut missing_coding,
|
||||
&mut erasures,
|
||||
((i - coding_start_idx) + NUM_DATA as u64) as i32,
|
||||
)?;
|
||||
|
||||
if let Some(b) = result {
|
||||
if size.is_none() {
|
||||
size = Some(b.len() - BLOB_HEADER_SIZE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function,
|
||||
// we know at least one coding block must exist, so "size" will not remain None after the
|
||||
// below processing.
|
||||
let size = size.unwrap();
|
||||
// marks end of erasures
|
||||
erasures.push(-1);
|
||||
trace!("erasures[]:{:?} data_size: {}", erasures, size,);
|
||||
|
||||
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
|
||||
{
|
||||
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
|
||||
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
|
||||
|
||||
for b in &blobs {
|
||||
locks.push(b.write().unwrap());
|
||||
}
|
||||
|
||||
for (i, l) in locks.iter_mut().enumerate() {
|
||||
if i < NUM_DATA {
|
||||
data_ptrs.push(&mut l.data[..size]);
|
||||
} else {
|
||||
coding_ptrs.push(&mut l.data_mut()[..size]);
|
||||
}
|
||||
}
|
||||
|
||||
// Decode the blocks
|
||||
decode_blocks(
|
||||
data_ptrs.as_mut_slice(),
|
||||
coding_ptrs.as_mut_slice(),
|
||||
&erasures,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Create the missing blobs from the reconstructed data
|
||||
let mut corrupt = false;
|
||||
|
||||
for i in &erasures[..erasures.len() - 1] {
|
||||
let n = *i as usize;
|
||||
let mut idx = n as u64 + block_start_idx;
|
||||
|
||||
let mut data_size;
|
||||
if n < NUM_DATA {
|
||||
data_size = locks[n].data_size().unwrap() as usize;
|
||||
data_size -= BLOB_HEADER_SIZE;
|
||||
if data_size > BLOB_DATA_SIZE {
|
||||
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
|
||||
corrupt = true;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
data_size = size;
|
||||
idx -= NUM_CODING as u64;
|
||||
locks[n].set_slot(slot).unwrap();
|
||||
locks[n].set_index(idx).unwrap();
|
||||
|
||||
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
|
||||
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
|
||||
corrupt = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
locks[n].set_size(data_size);
|
||||
trace!(
|
||||
"erasures[{}] ({}) size: {} data[0]: {}",
|
||||
*i,
|
||||
idx,
|
||||
data_size,
|
||||
locks[n].data()[0]
|
||||
);
|
||||
}
|
||||
|
||||
if corrupt {
|
||||
// Remove the corrupted coding blobs so there's no effort wasted in trying to
|
||||
// reconstruct the blobs again
|
||||
for i in coding_start_idx..block_end_idx {
|
||||
db_ledger.delete_coding_blob(slot, i)?;
|
||||
}
|
||||
return Ok((vec![], vec![]));
|
||||
}
|
||||
|
||||
Ok((missing_data, missing_coding))
|
||||
}
|
||||
|
||||
fn categorize_blob(
|
||||
get_blob_result: &Option<Vec<u8>>,
|
||||
blobs: &mut Vec<SharedBlob>,
|
||||
missing: &mut Vec<SharedBlob>,
|
||||
erasures: &mut Vec<i32>,
|
||||
erasure_index: i32,
|
||||
) -> Result<()> {
|
||||
match get_blob_result {
|
||||
Some(b) => {
|
||||
if b.len() <= BLOB_HEADER_SIZE || b.len() > BLOB_SIZE {
|
||||
return Err(Error::ErasureError(ErasureError::InvalidBlobData));
|
||||
}
|
||||
blobs.push(Arc::new(RwLock::new(Blob::new(&b))));
|
||||
}
|
||||
None => {
|
||||
// Mark the missing memory
|
||||
erasures.push(erasure_index);
|
||||
let b = SharedBlob::default();
|
||||
blobs.push(b.clone());
|
||||
missing.push(b);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use crate::db_ledger::get_tmp_ledger_path;
|
||||
use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
|
||||
use crate::entry::{make_tiny_test_entries, EntrySlice};
|
||||
|
||||
use crate::packet::{index_blobs, SharedBlob, BLOB_DATA_SIZE, BLOB_SIZE};
|
||||
use crate::window::WindowSlot;
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[test]
|
||||
pub fn test_coding() {
|
||||
let zero_vec = vec![0; 16];
|
||||
let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
|
||||
let v_orig: Vec<u8> = vs[0].clone();
|
||||
|
||||
let m = 2;
|
||||
let mut coding_blocks: Vec<_> = (0..m).map(|_| vec![0u8; 16]).collect();
|
||||
|
||||
{
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let v_slices: Vec<_> = vs.iter().map(|x| x.as_slice()).collect();
|
||||
|
||||
assert!(generate_coding_blocks(
|
||||
coding_blocks_slices.as_mut_slice(),
|
||||
v_slices.as_slice(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
trace!("coding blocks:");
|
||||
for b in &coding_blocks {
|
||||
trace!("{:?}", b);
|
||||
}
|
||||
let erasure: i32 = 1;
|
||||
let erasures = vec![erasure, -1];
|
||||
// clear an entry
|
||||
vs[erasure as usize].copy_from_slice(zero_vec.as_slice());
|
||||
|
||||
{
|
||||
let mut coding_blocks_slices: Vec<_> =
|
||||
coding_blocks.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
let mut v_slices: Vec<_> = vs.iter_mut().map(|x| x.as_mut_slice()).collect();
|
||||
|
||||
assert!(decode_blocks(
|
||||
v_slices.as_mut_slice(),
|
||||
coding_blocks_slices.as_mut_slice(),
|
||||
erasures.as_slice(),
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
trace!("vs:");
|
||||
for v in &vs {
|
||||
trace!("{:?}", v);
|
||||
}
|
||||
assert_eq!(v_orig, vs[0]);
|
||||
}
|
||||
|
||||
// TODO: Temprorary function used in tests to generate a database ledger
|
||||
// from the window (which is used to generate the erasure coding)
|
||||
// until we also transition generate_coding() and BroadcastStage to use DbLedger
|
||||
// Github issue: https://github.com/solana-labs/solana/issues/1899.
|
||||
pub fn generate_db_ledger_from_window(
|
||||
ledger_path: &str,
|
||||
window: &[WindowSlot],
|
||||
use_random: bool,
|
||||
) -> DbLedger {
|
||||
let db_ledger =
|
||||
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
|
||||
for slot in window {
|
||||
if let Some(ref data) = slot.data {
|
||||
// If we're using gibberish blobs, skip validation checks and insert
|
||||
// directly into the ledger
|
||||
if use_random {
|
||||
let data = data.read().unwrap();
|
||||
db_ledger
|
||||
.put_data_blob_bytes(
|
||||
data.slot().unwrap(),
|
||||
data.index().unwrap(),
|
||||
&data.data[..data.data_size().unwrap() as usize],
|
||||
)
|
||||
.expect("Expected successful put into data column of ledger");
|
||||
} else {
|
||||
db_ledger
|
||||
.write_shared_blobs(vec![data].into_iter())
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref coding) = slot.coding {
|
||||
let coding_lock = coding.read().unwrap();
|
||||
|
||||
let index = coding_lock
|
||||
.index()
|
||||
.expect("Expected coding blob to have valid index");
|
||||
|
||||
let data_size = coding_lock
|
||||
.size()
|
||||
.expect("Expected coding blob to have valid data size");
|
||||
|
||||
db_ledger
|
||||
.put_coding_blob_bytes(
|
||||
coding_lock.slot().unwrap(),
|
||||
index,
|
||||
&coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE],
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
db_ledger
|
||||
}
|
||||
|
||||
pub fn setup_window_ledger(
|
||||
|
@ -740,12 +896,14 @@ pub mod test {
|
|||
blobs.push(b_);
|
||||
}
|
||||
|
||||
{
|
||||
// Make some dummy slots
|
||||
let slot_tick_heights: Vec<(&SharedBlob, u64)> =
|
||||
blobs.iter().zip(vec![slot; blobs.len()]).collect();
|
||||
index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
|
||||
}
|
||||
index_blobs(
|
||||
&blobs,
|
||||
&Keypair::new().pubkey(),
|
||||
offset as u64,
|
||||
&vec![slot; blobs.len()],
|
||||
);
|
||||
|
||||
for b in blobs {
|
||||
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
|
||||
|
||||
|
@ -754,6 +912,18 @@ pub mod test {
|
|||
window
|
||||
}
|
||||
|
||||
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
|
||||
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
|
||||
|
||||
index_blobs(
|
||||
&blobs,
|
||||
&Keypair::new().pubkey(),
|
||||
offset as u64,
|
||||
&vec![DEFAULT_SLOT_HEIGHT; blobs.len()],
|
||||
);
|
||||
blobs
|
||||
}
|
||||
|
||||
fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
|
||||
let mut window = vec![
|
||||
WindowSlot {
|
||||
|
@ -763,17 +933,8 @@ pub mod test {
|
|||
};
|
||||
WINDOW_SIZE
|
||||
];
|
||||
let entries = make_tiny_test_entries(num_blobs);
|
||||
let blobs = entries.to_shared_blobs();
|
||||
|
||||
{
|
||||
// Make some dummy slots
|
||||
let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs
|
||||
.iter()
|
||||
.zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()])
|
||||
.collect();
|
||||
index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
|
||||
}
|
||||
let blobs = generate_test_blobs(offset, num_blobs);
|
||||
|
||||
for b in blobs.into_iter() {
|
||||
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
|
||||
|
|
|
@ -14,7 +14,6 @@ use crate::tpu::{Tpu, TpuReturnType};
|
|||
use crate::tpu_forwarder::TpuForwarder;
|
||||
use crate::tvu::{Sockets, Tvu, TvuReturnType};
|
||||
use crate::vote_signer_proxy::VoteSignerProxy;
|
||||
use crate::window::{new_window, SharedWindow};
|
||||
use log::Level;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
|
@ -98,7 +97,6 @@ pub struct Fullnode {
|
|||
bank: Arc<Bank>,
|
||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||
sigverify_disabled: bool,
|
||||
shared_window: SharedWindow,
|
||||
tvu_sockets: Vec<UdpSocket>,
|
||||
repair_socket: UdpSocket,
|
||||
retransmit_socket: UdpSocket,
|
||||
|
@ -204,8 +202,6 @@ impl Fullnode {
|
|||
|
||||
let db_ledger = db_ledger.unwrap_or_else(|| Self::make_db_ledger(ledger_path));
|
||||
|
||||
let window = new_window(32 * 1024);
|
||||
let shared_window = Arc::new(RwLock::new(window));
|
||||
node.info.wallclock = timestamp();
|
||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair(
|
||||
node.info,
|
||||
|
@ -315,7 +311,6 @@ impl Fullnode {
|
|||
.try_clone()
|
||||
.expect("Failed to clone broadcast socket"),
|
||||
cluster_info.clone(),
|
||||
shared_window.clone(),
|
||||
entry_height,
|
||||
bank.leader_scheduler.clone(),
|
||||
entry_receiver,
|
||||
|
@ -331,7 +326,6 @@ impl Fullnode {
|
|||
Fullnode {
|
||||
keypair,
|
||||
cluster_info,
|
||||
shared_window,
|
||||
bank,
|
||||
sigverify_disabled,
|
||||
gossip_service,
|
||||
|
@ -487,7 +481,6 @@ impl Fullnode {
|
|||
.try_clone()
|
||||
.expect("Failed to clone broadcast socket"),
|
||||
self.cluster_info.clone(),
|
||||
self.shared_window.clone(),
|
||||
entry_height,
|
||||
self.bank.leader_scheduler.clone(),
|
||||
blob_receiver,
|
||||
|
|
|
@ -70,6 +70,7 @@ pub mod tpu;
|
|||
pub mod tpu_forwarder;
|
||||
pub mod tvu;
|
||||
pub mod vote_signer_proxy;
|
||||
#[cfg(test)]
|
||||
pub mod window;
|
||||
pub mod window_service;
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ use log::Level;
|
|||
use serde::Serialize;
|
||||
pub use solana_sdk::packet::PACKET_DATA_SIZE;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use std::borrow::Borrow;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
|
@ -451,21 +450,14 @@ impl Blob {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn index_blobs<I, J, K>(blobs: I, id: &Pubkey, mut index: u64)
|
||||
where
|
||||
I: IntoIterator<Item = J>,
|
||||
J: Borrow<(K, u64)>,
|
||||
K: Borrow<SharedBlob>,
|
||||
{
|
||||
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) {
|
||||
// enumerate all the blobs, those are the indices
|
||||
for b in blobs {
|
||||
let (b, slot) = b.borrow();
|
||||
let mut blob = b.borrow().write().unwrap();
|
||||
for (blob, slot) in blobs.iter().zip(slots) {
|
||||
let mut blob = blob.write().unwrap();
|
||||
|
||||
blob.set_index(index).expect("set_index");
|
||||
blob.set_slot(*slot).expect("set_slot");
|
||||
blob.set_id(id).expect("set_id");
|
||||
blob.set_flags(0).unwrap();
|
||||
|
||||
index += 1;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue