remove window code from most places (#2389)

* remove window code from most places
* window used only for testing
* remove unecessary clippy directives
This commit is contained in:
Rob Walker 2019-01-14 12:11:55 -08:00 committed by GitHub
parent 8af61f561b
commit e3c0bd5a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 525 additions and 509 deletions

View File

@ -7,12 +7,11 @@ use crate::db_ledger::DbLedger;
use crate::entry::Entry;
use crate::entry::EntrySlice;
#[cfg(feature = "erasure")]
use crate::erasure;
use crate::erasure::CodingGenerator;
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::{index_blobs, SharedBlob};
use crate::packet::index_blobs;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::window::{SharedWindow, WindowIndex, WindowUtil};
use log::Level;
use rayon::prelude::*;
use solana_metrics::{influxdb, submit};
@ -32,160 +31,110 @@ pub enum BroadcastServiceReturnType {
ExitSignal,
}
#[allow(clippy::too_many_arguments)]
fn broadcast(
db_ledger: &Arc<DbLedger>,
struct Broadcast {
id: Pubkey,
max_tick_height: Option<u64>,
leader_id: Pubkey,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket,
transmit_index: &mut WindowIndex,
receive_index: &mut u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
let id = node_info.id;
let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?;
let now = Instant::now();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
ventries.push(entries);
blob_index: u64,
let mut contains_last_tick = false;
while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len();
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator,
}
impl Broadcast {
fn run(
&mut self,
broadcast_table: &[NodeInfo],
receiver: &Receiver<Vec<Entry>>,
sock: &UdpSocket,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
db_ledger: &Arc<DbLedger>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?;
let now = Instant::now();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
ventries.push(entries);
}
if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
contains_last_tick |= Some(last.tick_height) == max_tick_height;
}
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
let to_blobs_start = Instant::now();
// Generate the slot heights for all the entries inside ventries
let slot_heights = generate_slots(&ventries, leader_scheduler);
let blobs: Vec<_> = ventries
.into_par_iter()
.flat_map(|p| p.to_shared_blobs())
.collect();
let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect();
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
let blobs_chunking = Instant::now();
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let window_size = window.read().unwrap().window_size();
let blobs_chunked = blobs_slot_heights
.chunks(window_size as usize)
.map(|x| x.to_vec());
let chunking_elapsed = duration_as_ms(&blobs_chunking.elapsed());
let broadcast_start = Instant::now();
for blobs in blobs_chunked {
let blobs_len = blobs.len();
trace!("{}: broadcast blobs.len: {}", id, blobs_len);
index_blobs(blobs.iter(), &node_info.id, *receive_index);
// keep the cache of blobs that are broadcast
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
{
let mut win = window.write().unwrap();
assert!(blobs.len() <= win.len());
let blobs: Vec<_> = blobs.into_iter().map(|(b, _)| b).collect();
for b in &blobs {
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
if let Some(x) = win[pos].data.take() {
trace!(
"{} popped {} at {}",
id,
x.read().unwrap().index().unwrap(),
pos
);
}
if let Some(x) = win[pos].coding.take() {
trace!(
"{} popped {} at {}",
id,
x.read().unwrap().index().unwrap(),
pos
);
}
trace!("{} null {}", id, pos);
}
for b in &blobs {
{
let ix = b.read().unwrap().index().expect("blob index");
let pos = (ix % window_size) as usize;
trace!("{} caching {} at {}", id, ix, pos);
assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone());
}
}
db_ledger
.write_consecutive_blobs(&blobs)
.expect("Unrecoverable failure to write to database");
while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len();
ventries.push(entries);
}
let last_tick = match self.max_tick_height {
Some(max_tick_height) => {
if let Some(Some(last)) = ventries.last().map(|entries| entries.last()) {
last.tick_height == max_tick_height
} else {
false
}
}
None => false,
};
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
let to_blobs_start = Instant::now();
// Generate the slot heights for all the entries inside ventries
// this may span slots if this leader broadcasts for consecutive slots...
let slots = generate_slots(&ventries, leader_scheduler);
let blobs: Vec<_> = ventries
.into_par_iter()
.flat_map(|p| p.to_shared_blobs())
.collect();
// TODO: blob_index should be slot-relative...
index_blobs(&blobs, &self.id, self.blob_index, &slots);
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
let broadcast_start = Instant::now();
inc_new_counter_info!("streamer-broadcast-sent", blobs.len());
db_ledger
.write_consecutive_blobs(&blobs)
.expect("Unrecoverable failure to write to database");
// don't count coding blobs in the blob indexes
self.blob_index += blobs.len() as u64;
// Send out data
ClusterInfo::broadcast(&self.id, last_tick, &broadcast_table, sock, &blobs)?;
// Fill in the coding blob data from the window data blobs
#[cfg(feature = "erasure")]
{
erasure::generate_coding(
&id,
&mut window.write().unwrap(),
*receive_index,
blobs_len,
&mut transmit_index.coding,
)?;
let coding = self.coding_generator.next(&blobs)?;
// send out erasures
ClusterInfo::broadcast(&self.id, false, &broadcast_table, sock, &coding)?;
}
*receive_index += blobs_len as u64;
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
// Send blobs out from the window
ClusterInfo::broadcast(
contains_last_tick,
leader_id,
&node_info,
&broadcast_table,
&window,
&sock,
transmit_index,
*receive_index,
)?;
inc_new_counter_info!(
"broadcast_service-time_ms",
duration_as_ms(&now.elapsed()) as usize
);
info!(
"broadcast: {} entries, blob time {} broadcast time {}",
num_entries, to_blobs_elapsed, broadcast_elapsed
);
submit(
influxdb::Point::new("broadcast-service")
.add_field(
"transmit-index",
influxdb::Value::Integer(self.blob_index as i64),
)
.to_owned(),
);
Ok(())
}
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
inc_new_counter_info!(
"broadcast_service-time_ms",
duration_as_ms(&now.elapsed()) as usize
);
info!(
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed
);
submit(
influxdb::Point::new("broadcast-service")
.add_field(
"transmit-index",
influxdb::Value::Integer(transmit_index.data as i64),
)
.to_owned(),
);
Ok(())
}
fn generate_slots(
@ -240,46 +189,41 @@ pub struct BroadcastService {
}
impl BroadcastService {
#[allow(clippy::too_many_arguments)]
fn run(
db_ledger: &Arc<DbLedger>,
bank: &Arc<Bank>,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
window: &SharedWindow,
entry_height: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
receiver: &Receiver<Vec<Entry>>,
max_tick_height: Option<u64>,
exit_signal: &Arc<AtomicBool>,
) -> BroadcastServiceReturnType {
let mut transmit_index = WindowIndex {
data: entry_height,
coding: entry_height,
};
let mut receive_index = entry_height;
let me = cluster_info.read().unwrap().my_data().clone();
let mut broadcast = Broadcast {
id: me.id,
max_tick_height,
blob_index: entry_height,
#[cfg(feature = "erasure")]
coding_generator: CodingGenerator::new(),
};
loop {
if exit_signal.load(Ordering::Relaxed) {
return BroadcastServiceReturnType::ExitSignal;
}
let mut broadcast_table = cluster_info.read().unwrap().sorted_tvu_peers(&bank);
// Layer 1 nodes are limited to the fanout size.
// Layer 1, leader nodes are limited to the fanout size.
broadcast_table.truncate(DATA_PLANE_FANOUT);
inc_new_counter_info!("broadcast_service-num_peers", broadcast_table.len() + 1);
let leader_id = cluster_info.read().unwrap().leader_id();
if let Err(e) = broadcast(
db_ledger,
max_tick_height,
leader_id,
&me,
if let Err(e) = broadcast.run(
&broadcast_table,
&window,
&receiver,
&sock,
&mut transmit_index,
&mut receive_index,
receiver,
sock,
leader_scheduler,
db_ledger,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
@ -311,13 +255,11 @@ impl BroadcastService {
/// WriteStage is the last stage in the pipeline), which will then close Broadcast service,
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
/// completing the cycle.
#[allow(clippy::too_many_arguments)]
pub fn new(
db_ledger: Arc<DbLedger>,
bank: Arc<Bank>,
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
receiver: Receiver<Vec<Entry>>,
@ -334,7 +276,6 @@ impl BroadcastService {
&bank,
&sock,
&cluster_info,
&window,
entry_height,
&leader_scheduler,
&receiver,
@ -364,7 +305,6 @@ mod test {
use crate::db_ledger::DbLedger;
use crate::entry::create_ticks;
use crate::service::Service;
use crate::window::new_window;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::atomic::AtomicBool;
@ -401,8 +341,6 @@ mod test {
cluster_info.insert_info(broadcast_buddy.info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window));
let exit_sender = Arc::new(AtomicBool::new(false));
let bank = Arc::new(Bank::default());
@ -412,7 +350,6 @@ mod test {
bank.clone(),
leader_info.sockets.broadcast,
cluster_info,
shared_window,
entry_height,
leader_scheduler,
entry_receiver,

View File

@ -25,7 +25,6 @@ use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::result::Result;
use crate::rpc::RPC_PORT;
use crate::streamer::{BlobReceiver, BlobSender};
use crate::window::{SharedWindow, WindowIndex};
use bincode::{deserialize, serialize};
use hashbrown::HashMap;
use log::Level;
@ -493,58 +492,33 @@ impl ClusterInfo {
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
pub fn broadcast(
id: &Pubkey,
contains_last_tick: bool,
leader_id: Pubkey,
me: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
s: &UdpSocket,
transmit_index: &mut WindowIndex,
received_index: u64,
blobs: &[SharedBlob],
) -> Result<()> {
if broadcast_table.is_empty() {
debug!("{}:not enough peers in cluster_info table", me.id);
debug!("{}:not enough peers in cluster_info table", id);
inc_new_counter_info!("cluster_info-broadcast-not_enough_peers_error", 1);
Err(ClusterInfoError::NoPeers)?;
}
trace!(
"{} transmit_index: {:?} received_index: {} broadcast_len: {}",
me.id,
*transmit_index,
received_index,
broadcast_table.len()
);
let old_transmit_index = transmit_index.data;
let orders = Self::create_broadcast_orders(contains_last_tick, blobs, broadcast_table);
let orders = Self::create_broadcast_orders(
contains_last_tick,
window,
broadcast_table,
transmit_index,
received_index,
me,
);
trace!("broadcast orders table {}", orders.len());
let errs = Self::send_orders(s, orders, me, leader_id);
let errs = Self::send_orders(id, s, orders);
for e in errs {
if let Err(e) = &e {
trace!("broadcast result {:?}", e);
trace!("{}: broadcast result {:?}", id, e);
}
e?;
if transmit_index.data < received_index {
transmit_index.data += 1;
}
}
inc_new_counter_info!(
"cluster_info-broadcast-max_idx",
(transmit_index.data - old_transmit_index) as usize
);
transmit_index.coding = transmit_index.data;
inc_new_counter_info!("cluster_info-broadcast-max_idx", blobs.len());
Ok(())
}
@ -603,19 +577,15 @@ impl ClusterInfo {
}
fn send_orders(
id: &Pubkey,
s: &UdpSocket,
orders: Vec<(Option<SharedBlob>, Vec<&NodeInfo>)>,
me: &NodeInfo,
leader_id: Pubkey,
orders: Vec<(SharedBlob, Vec<&NodeInfo>)>,
) -> Vec<io::Result<usize>> {
orders
.into_iter()
.flat_map(|(b, vs)| {
// only leader should be broadcasting
assert!(vs.iter().find(|info| info.id == leader_id).is_none());
let bl = b.unwrap();
let blob = bl.read().unwrap();
//TODO profile this, may need multiple sockets for par_iter
let blob = b.read().unwrap();
let ids_and_tvus = if log_enabled!(Level::Trace) {
let v_ids = vs.iter().map(|v| v.id);
let tvus = vs.iter().map(|v| v.tvu);
@ -623,7 +593,7 @@ impl ClusterInfo {
trace!(
"{}: BROADCAST idx: {} sz: {} to {:?} coding: {}",
me.id,
id,
blob.index().unwrap(),
blob.meta.size,
ids_and_tvus,
@ -642,7 +612,7 @@ impl ClusterInfo {
let e = s.send_to(&blob.data[..blob.meta.size], &v.tvu);
trace!(
"{}: done broadcast {} to {:?}",
me.id,
id,
blob.meta.size,
ids_and_tvus
);
@ -656,70 +626,36 @@ impl ClusterInfo {
fn create_broadcast_orders<'a>(
contains_last_tick: bool,
window: &SharedWindow,
blobs: &[SharedBlob],
broadcast_table: &'a [NodeInfo],
transmit_index: &mut WindowIndex,
received_index: u64,
me: &NodeInfo,
) -> Vec<(Option<SharedBlob>, Vec<&'a NodeInfo>)> {
) -> Vec<(SharedBlob, Vec<&'a NodeInfo>)> {
// enumerate all the blobs in the window, those are the indices
// transmit them to nodes, starting from a different node.
let mut orders = Vec::with_capacity((received_index - transmit_index.data) as usize);
let window_l = window.read().unwrap();
let mut br_idx = transmit_index.data as usize % broadcast_table.len();
if blobs.is_empty() {
return vec![];
}
for idx in transmit_index.data..received_index {
let w_idx = idx as usize % window_l.len();
let mut orders = Vec::with_capacity(blobs.len());
trace!(
"{} broadcast order data w_idx {} br_idx {}",
me.id,
w_idx,
br_idx
);
for (i, blob) in blobs.iter().enumerate() {
let br_idx = i % broadcast_table.len();
trace!("broadcast order data br_idx {}", br_idx);
orders.push((blob.clone(), vec![&broadcast_table[br_idx]]));
}
if contains_last_tick {
// Broadcast the last tick to everyone on the network so it doesn't get dropped
// (Need to maximize probability the next leader in line sees this handoff tick
// despite packet drops)
let target = if idx == received_index - 1 && contains_last_tick {
// If we see a tick at max_tick_height, then we know it must be the last
// Blob in the window, at index == received_index. There cannot be an entry
// that got sent after the last tick, guaranteed by the PohService).
assert!(window_l[w_idx].data.is_some());
(
window_l[w_idx].data.clone(),
broadcast_table.iter().collect(),
)
} else {
(window_l[w_idx].data.clone(), vec![&broadcast_table[br_idx]])
};
orders.push(target);
br_idx += 1;
br_idx %= broadcast_table.len();
}
for idx in transmit_index.coding..received_index {
let w_idx = idx as usize % window_l.len();
// skip over empty slots
if window_l[w_idx].coding.is_none() {
continue;
}
trace!(
"{} broadcast order coding w_idx: {} br_idx :{}",
me.id,
w_idx,
br_idx,
);
// If we had a tick at max_tick_height, then we know it must be the last
// Blob in the broadcast, There cannot be an entry that got sent after the
// last tick, guaranteed by the PohService).
orders.push((
window_l[w_idx].coding.clone(),
vec![&broadcast_table[br_idx]],
blobs.last().unwrap().clone(),
broadcast_table.iter().collect(),
));
br_idx += 1;
br_idx %= broadcast_table.len();
}
orders

View File

@ -976,11 +976,7 @@ mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = DEFAULT_SLOT_HEIGHT;
index_blobs(
shared_blobs.iter().zip(vec![slot; 10].into_iter()),
&Keypair::new().pubkey(),
0,
);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), 0, &[slot; 10]);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();

View File

@ -562,9 +562,10 @@ mod test {
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
index_blobs(
shared_blobs.iter().zip(vec![slot; num_entries].into_iter()),
&shared_blobs,
&Keypair::new().pubkey(),
0,
&vec![slot; num_entries],
);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
@ -657,11 +658,10 @@ mod test {
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(
shared_blobs
.iter()
.zip(vec![DEFAULT_SLOT_HEIGHT; num_entries].into_iter()),
&shared_blobs,
&Keypair::new().pubkey(),
0,
&vec![DEFAULT_SLOT_HEIGHT; num_entries],
);
let mut consume_queue = vec![];

View File

@ -2,8 +2,6 @@
use crate::db_ledger::DbLedger;
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE, BLOB_HEADER_SIZE, BLOB_SIZE};
use crate::result::{Error, Result};
use crate::window::WindowSlot;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::sync::{Arc, RwLock};
@ -177,6 +175,79 @@ pub fn decode_blocks(
Ok(())
}
fn decode_blobs(
blobs: &[SharedBlob],
erasures: &[i32],
size: usize,
block_start_idx: u64,
slot: u64,
) -> Result<bool> {
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
assert!(blobs.len() == NUM_DATA + NUM_CODING);
for b in blobs {
locks.push(b.write().unwrap());
}
for (i, l) in locks.iter_mut().enumerate() {
if i < NUM_DATA {
data_ptrs.push(&mut l.data[..size]);
} else {
coding_ptrs.push(&mut l.data_mut()[..size]);
}
}
// Decode the blocks
decode_blocks(
data_ptrs.as_mut_slice(),
coding_ptrs.as_mut_slice(),
&erasures,
)?;
// Create the missing blobs from the reconstructed data
let mut corrupt = false;
for i in &erasures[..erasures.len() - 1] {
let n = *i as usize;
let mut idx = n as u64 + block_start_idx;
let mut data_size;
if n < NUM_DATA {
data_size = locks[n].data_size().unwrap() as usize;
data_size -= BLOB_HEADER_SIZE;
if data_size > BLOB_DATA_SIZE {
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
corrupt = true;
break;
}
} else {
data_size = size;
idx -= NUM_CODING as u64;
locks[n].set_slot(slot).unwrap();
locks[n].set_index(idx).unwrap();
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
corrupt = true;
break;
}
}
locks[n].set_size(data_size);
trace!(
"erasures[{}] ({}) size: {} data[0]: {}",
*i,
idx,
data_size,
locks[n].data()[0]
);
}
Ok(corrupt)
}
// Generate coding blocks in window starting from start_idx,
// for num_blobs.. For each block place the coding blobs
// at the end of the block like so:
@ -214,137 +285,80 @@ pub fn decode_blocks(
//
//
//
pub fn generate_coding(
id: &Pubkey,
window: &mut [WindowSlot],
receive_index: u64,
num_blobs: usize,
transmit_index_coding: &mut u64,
) -> Result<()> {
// beginning of the coding blobs of the block that receive_index points into
let coding_index_start =
receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
pub struct CodingGenerator {
leftover: Vec<SharedBlob>, // SharedBlobs that couldn't be used in last call to next()
}
let start_idx = receive_index as usize % window.len();
let mut block_start = start_idx - (start_idx % NUM_DATA);
loop {
let block_end = block_start + NUM_DATA;
if block_end > (start_idx + num_blobs) {
break;
impl CodingGenerator {
pub fn new() -> Self {
Self {
leftover: Vec::with_capacity(NUM_DATA),
}
info!(
"generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}",
id, block_start, block_end, start_idx, num_blobs
);
let mut max_data_size = 0;
// find max_data_size, maybe bail if not all the data is here
for i in block_start..block_end {
let n = i % window.len();
trace!("{} window[{}] = {:?}", id, n, window[n].data);
if let Some(b) = &window[n].data {
max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size);
} else {
trace!("{} data block is null @ {}", id, n);
return Ok(());
}
}
// round up to the nearest jerasure alignment
max_data_size = align!(max_data_size, JERASURE_ALIGN);
let mut data_blobs = Vec::with_capacity(NUM_DATA);
for i in block_start..block_end {
let n = i % window.len();
if let Some(b) = &window[n].data {
// make sure extra bytes in each blob are zero-d out for generation of
// coding blobs
let mut b_wl = b.write().unwrap();
for i in b_wl.meta.size..max_data_size {
b_wl.data[i] = 0;
}
data_blobs.push(b);
}
}
// getting ready to do erasure coding, means that we're potentially
// going back in time, tell our caller we've inserted coding blocks
// starting at coding_index_start
*transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let coding_start = block_end - NUM_CODING;
for i in coding_start..block_end {
let n = i % window.len();
assert!(window[n].coding.is_none());
window[n].coding = Some(SharedBlob::default());
let coding = window[n].coding.clone().unwrap();
let mut coding_wl = coding.write().unwrap();
for i in 0..max_data_size {
coding_wl.data[i] = 0;
}
// copy index and id from the data blob
if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap();
let index = data_rl.index().unwrap();
let slot = data_rl.slot().unwrap();
let id = data_rl.id().unwrap();
trace!(
"{} copying index {} id {:?} from data to coding",
id,
index,
id
);
coding_wl.set_index(index).unwrap();
coding_wl.set_slot(slot).unwrap();
coding_wl.set_id(&id).unwrap();
}
coding_wl.set_size(max_data_size);
if coding_wl.set_coding().is_err() {
return Err(Error::ErasureError(ErasureError::EncodeError));
}
coding_blobs.push(coding.clone());
}
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.enumerate()
.map(|(i, l)| {
trace!("{} i: {} data: {}", id, i, l.data[0]);
&l.data[..max_data_size]
})
.collect();
let mut coding_locks: Vec<_> = coding_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut coding_ptrs: Vec<_> = coding_locks
.iter_mut()
.enumerate()
.map(|(i, l)| {
trace!("{} i: {} coding: {}", id, i, l.data[0],);
&mut l.data_mut()[..max_data_size]
})
.collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(
"{} start_idx: {} data: {}:{} coding: {}:{}",
id, start_idx, block_start, block_end, coding_start, block_end
);
block_start = block_end;
}
Ok(())
// must be called with consecutive data blobs from previous invocation
pub fn next(&mut self, next_data: &[SharedBlob]) -> Result<Vec<SharedBlob>> {
let mut next_coding =
Vec::with_capacity((self.leftover.len() + next_data.len()) / NUM_DATA * NUM_CODING);
let next_data: Vec<_> = self.leftover.iter().chain(next_data).cloned().collect();
for data_blobs in next_data.chunks(NUM_DATA) {
if data_blobs.len() < NUM_DATA {
self.leftover = data_blobs.to_vec();
break;
}
self.leftover.clear();
// find max_data_size for the chunk
let max_data_size = align!(
data_blobs
.iter()
.fold(0, |max, blob| cmp::max(blob.read().unwrap().meta.size, max)),
JERASURE_ALIGN
);
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.map(|l| &l.data[..max_data_size])
.collect();
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
let index = data_blob.index().unwrap();
let slot = data_blob.slot().unwrap();
let id = data_blob.id().unwrap();
let coding_blob = SharedBlob::default();
{
let mut coding_blob = coding_blob.write().unwrap();
coding_blob.set_index(index).unwrap();
coding_blob.set_slot(slot).unwrap();
coding_blob.set_id(&id).unwrap();
coding_blob.set_size(max_data_size);
coding_blob.set_coding().unwrap();
}
coding_blobs.push(coding_blob);
}
{
let mut coding_locks: Vec<_> =
coding_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut coding_ptrs: Vec<_> = coding_locks
.iter_mut()
.map(|l| &mut l.data_mut()[..max_data_size])
.collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
}
next_coding.append(&mut coding_blobs);
}
Ok(next_coding)
}
}
// Recover the missing data and coding blobs from the input ledger. Returns a vector
@ -401,7 +415,6 @@ pub fn recover(
let mut missing_data: Vec<SharedBlob> = vec![];
let mut missing_coding: Vec<SharedBlob> = vec![];
let mut size = None;
// Add the data blobs we have into the recovery vector, mark the missing ones
for i in block_start_idx..block_end_idx {
@ -416,6 +429,7 @@ pub fn recover(
)?;
}
let mut size = None;
// Add the coding blobs we have into the recovery vector, mark the missing ones
for i in coding_start_idx..block_end_idx {
let result = db_ledger.get_coding_blob_bytes(slot, i)?;
@ -434,78 +448,17 @@ pub fn recover(
}
}
}
// Due to check (data_missing + coding_missing) > NUM_CODING from earlier in this function,
// we know at least one coding block must exist, so "size" will not remain None after the
// below processing.
// Due to checks above verifying that (data_missing + coding_missing) <= NUM_CODING and
// data_missing > 0, we know at least one coding block must exist, so "size" can
// not remain None after the above processing.
let size = size.unwrap();
// marks end of erasures
erasures.push(-1);
trace!("erasures[]:{:?} data_size: {}", erasures, size,);
let mut locks = Vec::with_capacity(NUM_DATA + NUM_CODING);
{
let mut coding_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_CODING);
let mut data_ptrs: Vec<&mut [u8]> = Vec::with_capacity(NUM_DATA);
for b in &blobs {
locks.push(b.write().unwrap());
}
for (i, l) in locks.iter_mut().enumerate() {
if i < NUM_DATA {
data_ptrs.push(&mut l.data[..size]);
} else {
coding_ptrs.push(&mut l.data_mut()[..size]);
}
}
// Decode the blocks
decode_blocks(
data_ptrs.as_mut_slice(),
coding_ptrs.as_mut_slice(),
&erasures,
)?;
}
// Create the missing blobs from the reconstructed data
let mut corrupt = false;
for i in &erasures[..erasures.len() - 1] {
let n = *i as usize;
let mut idx = n as u64 + block_start_idx;
let mut data_size;
if n < NUM_DATA {
data_size = locks[n].data_size().unwrap() as usize;
data_size -= BLOB_HEADER_SIZE;
if data_size > BLOB_DATA_SIZE {
error!("corrupt data blob[{}] data_size: {}", idx, data_size);
corrupt = true;
break;
}
} else {
data_size = size;
idx -= NUM_CODING as u64;
locks[n].set_slot(slot).unwrap();
locks[n].set_index(idx).unwrap();
if data_size - BLOB_HEADER_SIZE > BLOB_DATA_SIZE {
error!("corrupt coding blob[{}] data_size: {}", idx, data_size);
corrupt = true;
break;
}
}
locks[n].set_size(data_size);
trace!(
"erasures[{}] ({}) size: {} data[0]: {}",
*i,
idx,
data_size,
locks[n].data()[0]
);
}
let corrupt = decode_blobs(&blobs, &erasures, size, block_start_idx, slot)?;
if corrupt {
// Remove the corrupted coding blobs so there's no effort wasted in trying to
@ -560,7 +513,7 @@ pub mod test {
use std::sync::Arc;
#[test]
pub fn test_coding() {
fn test_coding() {
let zero_vec = vec![0; 16];
let mut vs: Vec<Vec<u8>> = (0..4).map(|i| (i..(16 + i)).collect()).collect();
let v_orig: Vec<u8> = vs[0].clone();
@ -608,6 +561,75 @@ pub mod test {
assert_eq!(v_orig, vs[0]);
}
#[test]
fn test_erasure_generate_coding() {
solana_logger::setup();
// trivial case
let mut coding_generator = CodingGenerator::new();
let blobs = Vec::new();
for _ in 0..NUM_DATA * 2 {
let coding = coding_generator.next(&blobs).unwrap();
assert_eq!(coding.len(), 0);
}
// test coding by iterating one blob at a time
let data_blobs = generate_test_blobs(0, NUM_DATA * 2);
for (i, blob) in data_blobs.iter().cloned().enumerate() {
let coding = coding_generator.next(&[blob]).unwrap();
if !coding.is_empty() {
assert_eq!(i % NUM_DATA, NUM_DATA - 1);
assert_eq!(coding.len(), NUM_CODING);
let size = coding[0].read().unwrap().size().unwrap();
// toss one data and one coding
let erasures: Vec<i32> = vec![0, NUM_DATA as i32, -1];
let block_start_idx = i - (i % NUM_DATA);
let mut blobs: Vec<SharedBlob> = Vec::with_capacity(NUM_DATA + NUM_CODING);
blobs.push(SharedBlob::default()); // empty data, erasure at zero
for blob in &data_blobs[block_start_idx + 1..block_start_idx + NUM_DATA] {
// skip first blob
blobs.push(blob.clone());
}
blobs.push(SharedBlob::default()); // empty coding, erasure at NUM_DATA
for blob in &coding[1..NUM_CODING] {
blobs.push(blob.clone());
}
let corrupt =
decode_blobs(&blobs, &erasures, size, block_start_idx as u64, 0).unwrap();
assert!(!corrupt);
assert_eq!(
blobs[1].read().unwrap().meta,
data_blobs[block_start_idx + 1].read().unwrap().meta
);
assert_eq!(
blobs[1].read().unwrap().data(),
data_blobs[block_start_idx + 1].read().unwrap().data()
);
assert_eq!(
blobs[0].read().unwrap().meta,
data_blobs[block_start_idx].read().unwrap().meta
);
assert_eq!(
blobs[0].read().unwrap().data(),
data_blobs[block_start_idx].read().unwrap().data()
);
assert_eq!(
blobs[NUM_DATA].read().unwrap().data(),
coding[0].read().unwrap().data()
);
}
}
}
// TODO: Temprorary function used in tests to generate a database ledger
// from the window (which is used to generate the erasure coding)
// until we also transition generate_coding() and BroadcastStage to use DbLedger
@ -663,6 +685,140 @@ pub mod test {
db_ledger
}
fn generate_coding(
id: &Pubkey,
window: &mut [WindowSlot],
receive_index: u64,
num_blobs: usize,
transmit_index_coding: &mut u64,
) -> Result<()> {
// beginning of the coding blobs of the block that receive_index points into
let coding_index_start =
receive_index - (receive_index % NUM_DATA as u64) + (NUM_DATA - NUM_CODING) as u64;
let start_idx = receive_index as usize % window.len();
let mut block_start = start_idx - (start_idx % NUM_DATA);
loop {
let block_end = block_start + NUM_DATA;
if block_end > (start_idx + num_blobs) {
break;
}
info!(
"generate_coding {} start: {} end: {} start_idx: {} num_blobs: {}",
id, block_start, block_end, start_idx, num_blobs
);
let mut max_data_size = 0;
// find max_data_size, maybe bail if not all the data is here
for i in block_start..block_end {
let n = i % window.len();
trace!("{} window[{}] = {:?}", id, n, window[n].data);
if let Some(b) = &window[n].data {
max_data_size = cmp::max(b.read().unwrap().meta.size, max_data_size);
} else {
trace!("{} data block is null @ {}", id, n);
return Ok(());
}
}
// round up to the nearest jerasure alignment
max_data_size = align!(max_data_size, JERASURE_ALIGN);
let mut data_blobs = Vec::with_capacity(NUM_DATA);
for i in block_start..block_end {
let n = i % window.len();
if let Some(b) = &window[n].data {
// make sure extra bytes in each blob are zero-d out for generation of
// coding blobs
let mut b_wl = b.write().unwrap();
for i in b_wl.meta.size..max_data_size {
b_wl.data[i] = 0;
}
data_blobs.push(b);
}
}
// getting ready to do erasure coding, means that we're potentially
// going back in time, tell our caller we've inserted coding blocks
// starting at coding_index_start
*transmit_index_coding = cmp::min(*transmit_index_coding, coding_index_start);
let mut coding_blobs = Vec::with_capacity(NUM_CODING);
let coding_start = block_end - NUM_CODING;
for i in coding_start..block_end {
let n = i % window.len();
assert!(window[n].coding.is_none());
window[n].coding = Some(SharedBlob::default());
let coding = window[n].coding.clone().unwrap();
let mut coding_wl = coding.write().unwrap();
for i in 0..max_data_size {
coding_wl.data[i] = 0;
}
// copy index and id from the data blob
if let Some(data) = &window[n].data {
let data_rl = data.read().unwrap();
let index = data_rl.index().unwrap();
let slot = data_rl.slot().unwrap();
let id = data_rl.id().unwrap();
trace!(
"{} copying index {} id {:?} from data to coding",
id,
index,
id
);
coding_wl.set_index(index).unwrap();
coding_wl.set_slot(slot).unwrap();
coding_wl.set_id(&id).unwrap();
}
coding_wl.set_size(max_data_size);
if coding_wl.set_coding().is_err() {
return Err(Error::ErasureError(ErasureError::EncodeError));
}
coding_blobs.push(coding.clone());
}
let data_locks: Vec<_> = data_blobs.iter().map(|b| b.read().unwrap()).collect();
let data_ptrs: Vec<_> = data_locks
.iter()
.enumerate()
.map(|(i, l)| {
trace!("{} i: {} data: {}", id, i, l.data[0]);
&l.data[..max_data_size]
})
.collect();
let mut coding_locks: Vec<_> =
coding_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut coding_ptrs: Vec<_> = coding_locks
.iter_mut()
.enumerate()
.map(|(i, l)| {
trace!("{} i: {} coding: {}", id, i, l.data[0],);
&mut l.data_mut()[..max_data_size]
})
.collect();
generate_coding_blocks(coding_ptrs.as_mut_slice(), &data_ptrs)?;
debug!(
"{} start_idx: {} data: {}:{} coding: {}:{}",
id, start_idx, block_start, block_end, coding_start, block_end
);
block_start = block_end;
}
Ok(())
}
pub fn setup_window_ledger(
offset: usize,
num_blobs: usize,
@ -740,12 +896,14 @@ pub mod test {
blobs.push(b_);
}
{
// Make some dummy slots
let slot_tick_heights: Vec<(&SharedBlob, u64)> =
blobs.iter().zip(vec![slot; blobs.len()]).collect();
index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
}
// Make some dummy slots
index_blobs(
&blobs,
&Keypair::new().pubkey(),
offset as u64,
&vec![slot; blobs.len()],
);
for b in blobs {
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;
@ -754,6 +912,18 @@ pub mod test {
window
}
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
index_blobs(
&blobs,
&Keypair::new().pubkey(),
offset as u64,
&vec![DEFAULT_SLOT_HEIGHT; blobs.len()],
);
blobs
}
fn generate_entry_window(offset: usize, num_blobs: usize) -> Vec<WindowSlot> {
let mut window = vec![
WindowSlot {
@ -763,17 +933,8 @@ pub mod test {
};
WINDOW_SIZE
];
let entries = make_tiny_test_entries(num_blobs);
let blobs = entries.to_shared_blobs();
{
// Make some dummy slots
let slot_tick_heights: Vec<(&SharedBlob, u64)> = blobs
.iter()
.zip(vec![DEFAULT_SLOT_HEIGHT; blobs.len()])
.collect();
index_blobs(slot_tick_heights, &Keypair::new().pubkey(), offset as u64);
}
let blobs = generate_test_blobs(offset, num_blobs);
for b in blobs.into_iter() {
let idx = b.read().unwrap().index().unwrap() as usize % WINDOW_SIZE;

View File

@ -14,7 +14,6 @@ use crate::tpu::{Tpu, TpuReturnType};
use crate::tpu_forwarder::TpuForwarder;
use crate::tvu::{Sockets, Tvu, TvuReturnType};
use crate::vote_signer_proxy::VoteSignerProxy;
use crate::window::{new_window, SharedWindow};
use log::Level;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
@ -98,7 +97,6 @@ pub struct Fullnode {
bank: Arc<Bank>,
cluster_info: Arc<RwLock<ClusterInfo>>,
sigverify_disabled: bool,
shared_window: SharedWindow,
tvu_sockets: Vec<UdpSocket>,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
@ -204,8 +202,6 @@ impl Fullnode {
let db_ledger = db_ledger.unwrap_or_else(|| Self::make_db_ledger(ledger_path));
let window = new_window(32 * 1024);
let shared_window = Arc::new(RwLock::new(window));
node.info.wallclock = timestamp();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_keypair(
node.info,
@ -315,7 +311,6 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone broadcast socket"),
cluster_info.clone(),
shared_window.clone(),
entry_height,
bank.leader_scheduler.clone(),
entry_receiver,
@ -331,7 +326,6 @@ impl Fullnode {
Fullnode {
keypair,
cluster_info,
shared_window,
bank,
sigverify_disabled,
gossip_service,
@ -487,7 +481,6 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone broadcast socket"),
self.cluster_info.clone(),
self.shared_window.clone(),
entry_height,
self.bank.leader_scheduler.clone(),
blob_receiver,

View File

@ -70,6 +70,7 @@ pub mod tpu;
pub mod tpu_forwarder;
pub mod tvu;
pub mod vote_signer_proxy;
#[cfg(test)]
pub mod window;
pub mod window_service;

View File

@ -8,7 +8,6 @@ use log::Level;
use serde::Serialize;
pub use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::cmp;
use std::fmt;
use std::io;
@ -451,21 +450,14 @@ impl Blob {
}
}
pub fn index_blobs<I, J, K>(blobs: I, id: &Pubkey, mut index: u64)
where
I: IntoIterator<Item = J>,
J: Borrow<(K, u64)>,
K: Borrow<SharedBlob>,
{
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut index: u64, slots: &[u64]) {
// enumerate all the blobs, those are the indices
for b in blobs {
let (b, slot) = b.borrow();
let mut blob = b.borrow().write().unwrap();
for (blob, slot) in blobs.iter().zip(slots) {
let mut blob = blob.write().unwrap();
blob.set_index(index).expect("set_index");
blob.set_slot(*slot).expect("set_slot");
blob.set_id(id).expect("set_id");
blob.set_flags(0).unwrap();
index += 1;
}