Broadcast final shred for slots that are interrupted (#6269)

* Broadcast final shred for slots that are interrupted
This commit is contained in:
carllin 2019-10-09 16:07:18 -07:00 committed by GitHub
parent de82e60c64
commit dd66d16fdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 388 additions and 183 deletions

View File

@ -4,7 +4,7 @@ extern crate test;
use solana_core::entry::create_ticks;
use solana_core::shred::{
max_ticks_per_shred, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_HEADER,
max_ticks_per_n_shreds, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_HEADER,
};
use solana_sdk::hash::Hash;
use solana_sdk::packet::PACKET_DATA_SIZE;
@ -18,7 +18,7 @@ fn bench_shredder(bencher: &mut Bencher) {
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size;
// ~1Mb
let num_ticks = max_ticks_per_shred() * num_shreds as u64;
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
let entries = create_ticks(num_ticks, Hash::default());
bencher.iter(|| {
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone()).unwrap();
@ -32,7 +32,7 @@ fn bench_deshredder(bencher: &mut Bencher) {
let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER;
// ~10Mb
let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size;
let num_ticks = max_ticks_per_shred() * num_shreds as u64;
let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64;
let entries = create_ticks(num_ticks, Hash::default());
let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp).unwrap();
let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0;

View File

@ -1653,7 +1653,7 @@ pub mod tests {
use super::*;
use crate::entry::{create_ticks, Entry};
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::shred::max_ticks_per_shred;
use crate::shred::max_ticks_per_n_shreds;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
@ -1682,7 +1682,7 @@ pub mod tests {
#[test]
fn test_insert_get_bytes() {
// Create enough entries to ensure there are at least two shreds created
let num_entries = max_ticks_per_shred() + 1;
let num_entries = max_ticks_per_n_shreds(1) + 1;
assert!(num_entries > 1);
let (mut shreds, _) = make_slot_entries(0, 0, num_entries);
@ -1921,7 +1921,7 @@ pub mod tests {
#[test]
fn test_insert_data_shreds_basic() {
// Create enough entries to ensure there are at least two shreds created
let num_entries = max_ticks_per_shred() + 1;
let num_entries = max_ticks_per_n_shreds(1) + 1;
assert!(num_entries > 1);
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
@ -2144,7 +2144,7 @@ pub mod tests {
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Create enough entries to ensure there are at least two shreds created
let min_entries = max_ticks_per_shred() + 1;
let min_entries = max_ticks_per_n_shreds(1) + 1;
for i in 0..4 {
let slot = i;
let parent_slot = if i == 0 { 0 } else { i - 1 };
@ -2557,7 +2557,7 @@ pub mod tests {
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 15;
// Create enough entries to ensure there are at least two shreds created
let entries_per_slot = max_ticks_per_shred() + 1;
let entries_per_slot = max_ticks_per_n_shreds(1) + 1;
assert!(entries_per_slot > 1);
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
@ -2907,7 +2907,7 @@ pub mod tests {
let gap: u64 = 10;
assert!(gap > 3);
// Create enough entries to ensure there are at least two shreds created
let num_entries = max_ticks_per_shred() + 1;
let num_entries = max_ticks_per_n_shreds(1) + 1;
let entries = create_ticks(num_entries, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true);
let num_shreds = shreds.len();

View File

@ -8,7 +8,7 @@ use crate::poh_recorder::WorkingBankEntry;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use solana_metrics::{datapoint_debug, inc_new_counter_error, inc_new_counter_info};
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};

View File

@ -15,7 +15,7 @@ pub(super) struct ReceiveResults {
#[derive(Copy, Clone)]
pub struct UnfinishedSlotInfo {
pub next_index: u64,
pub next_shred_index: u32,
pub slot: u64,
pub parent: u64,
}

View File

@ -1,16 +1,14 @@
use super::broadcast_utils;
use super::broadcast_utils::{self, ReceiveResults};
use super::*;
use crate::shred::{Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::timing::duration_as_ms;
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use crate::entry::Entry;
use crate::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE};
use solana_sdk::signature::Keypair;
use solana_sdk::timing::duration_as_us;
use std::time::Duration;
#[derive(Default)]
struct BroadcastStats {
num_entries: Vec<usize>,
run_elapsed: Vec<u64>,
to_blobs_elapsed: Vec<u64>,
slots: Vec<u64>,
// Per-slot elapsed time
shredding_elapsed: u64,
insert_shreds_elapsed: u64,
@ -19,9 +17,20 @@ struct BroadcastStats {
clone_and_seed_elapsed: u64,
}
impl BroadcastStats {
fn reset(&mut self) {
self.insert_shreds_elapsed = 0;
self.shredding_elapsed = 0;
self.broadcast_elapsed = 0;
self.receive_elapsed = 0;
self.clone_and_seed_elapsed = 0;
}
}
pub(super) struct StandardBroadcastRun {
stats: BroadcastStats,
current_slot: Option<u64>,
unfinished_slot: Option<UnfinishedSlotInfo>,
current_slot_and_parent: Option<(u64, u64)>,
slot_broadcast_start: Option<Instant>,
}
@ -29,177 +38,162 @@ impl StandardBroadcastRun {
pub(super) fn new() -> Self {
Self {
stats: BroadcastStats::default(),
current_slot: None,
unfinished_slot: None,
current_slot_and_parent: None,
slot_broadcast_start: None,
}
}
#[allow(clippy::too_many_arguments)]
fn update_broadcast_stats(
&mut self,
receive_entries_elapsed: u64,
shredding_elapsed: u64,
insert_shreds_elapsed: u64,
broadcast_elapsed: u64,
run_elapsed: u64,
clone_and_seed_elapsed: u64,
num_entries: usize,
num_shreds: usize,
shred_index: u32,
slot: u64,
slot_ended: bool,
latest_shred_index: u32,
) {
self.stats.insert_shreds_elapsed += insert_shreds_elapsed;
self.stats.shredding_elapsed += shredding_elapsed;
self.stats.broadcast_elapsed += broadcast_elapsed;
self.stats.receive_elapsed += receive_entries_elapsed;
self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed;
fn check_for_interrupted_slot(&mut self) -> Option<Shred> {
let (slot, _) = self.current_slot_and_parent.unwrap();
let last_unfinished_slot_shred = self
.unfinished_slot
.map(|last_unfinished_slot| {
if last_unfinished_slot.slot != slot {
self.report_and_reset_stats();
Some(Shred::new_from_data(
last_unfinished_slot.slot,
last_unfinished_slot.next_shred_index,
(last_unfinished_slot.slot - last_unfinished_slot.parent) as u16,
None,
true,
true,
))
} else {
None
}
})
.unwrap_or(None);
if slot_ended {
datapoint_info!(
"broadcast-bank-stats",
("slot", slot as i64, i64),
("shredding_time", self.stats.shredding_elapsed as i64, i64),
(
"insertion_time",
self.stats.insert_shreds_elapsed as i64,
i64
),
("broadcast_time", self.stats.broadcast_elapsed as i64, i64),
("receive_time", self.stats.receive_elapsed as i64, i64),
(
"clone_and_seed",
self.stats.clone_and_seed_elapsed as i64,
i64
),
("num_shreds", i64::from(latest_shred_index), i64),
(
"slot_broadcast_time",
self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64,
i64
),
);
self.stats.insert_shreds_elapsed = 0;
self.stats.shredding_elapsed = 0;
self.stats.broadcast_elapsed = 0;
self.stats.receive_elapsed = 0;
self.stats.clone_and_seed_elapsed = 0;
// This shred should only be Some if the previous slot was interrupted
if last_unfinished_slot_shred.is_some() {
self.unfinished_slot = None;
}
inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize);
self.stats.num_entries.push(num_entries);
self.stats.to_blobs_elapsed.push(shredding_elapsed);
self.stats.run_elapsed.push(run_elapsed);
self.stats.slots.push(slot);
if self.stats.num_entries.len() >= 16 {
info!(
"broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?} slots: {:?}",
self.stats.num_entries,
self.stats.to_blobs_elapsed,
self.stats.run_elapsed,
self.stats.slots,
);
self.stats.num_entries.clear();
self.stats.to_blobs_elapsed.clear();
self.stats.run_elapsed.clear();
self.stats.slots.clear();
}
datapoint_debug!(
"broadcast-service",
("num_entries", num_entries as i64, i64),
("num_shreds", num_shreds as i64, i64),
("receive_time", receive_entries_elapsed as i64, i64),
("shredding_time", shredding_elapsed as i64, i64),
("insert_shred_time", insert_shreds_elapsed as i64, i64),
("broadcast_time", broadcast_elapsed as i64, i64),
("transmit-index", i64::from(shred_index), i64),
);
last_unfinished_slot_shred
}
}
impl BroadcastRun for StandardBroadcastRun {
fn run(
fn coalesce_shreds(
data_shreds: Vec<Shred>,
coding_shreds: Vec<Shred>,
last_unfinished_slot_shred: Option<Shred>,
) -> Vec<Shred> {
if let Some(shred) = last_unfinished_slot_shred {
data_shreds
.iter()
.chain(coding_shreds.iter())
.cloned()
.chain(std::iter::once(shred))
.collect::<Vec<_>>()
} else {
data_shreds
.iter()
.chain(coding_shreds.iter())
.cloned()
.collect::<Vec<_>>()
}
}
fn entries_to_shreds(
&mut self,
blocktree: &Blocktree,
entries: &[Entry],
keypair: Arc<Keypair>,
is_slot_end: bool,
) -> (Vec<Shred>, Vec<Shred>) {
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
let shredder = Shredder::new(slot, parent_slot, RECOMMENDED_FEC_RATE, keypair)
.expect("Expected to create a new shredder");
let next_shred_index = self
.unfinished_slot
.map(|s| s.next_shred_index)
.unwrap_or_else(|| {
blocktree
.meta(slot)
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0) as u32
});
let (data_shreds, coding_shreds, new_next_shred_index) =
shredder.entries_to_shreds(entries, is_slot_end, next_shred_index);
self.unfinished_slot = Some(UnfinishedSlotInfo {
next_shred_index: new_next_shred_index,
slot,
parent: parent_slot,
});
(data_shreds, coding_shreds)
}
fn process_receive_results(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
receive_results: ReceiveResults,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
let mut receive_elapsed = receive_results.time_elapsed;
let num_entries = receive_results.entries.len();
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
if Some(bank.slot()) != self.current_slot {
if self.current_slot_and_parent.is_none()
|| bank.slot() != self.current_slot_and_parent.unwrap().0
{
self.slot_broadcast_start = Some(Instant::now());
self.current_slot = Some(bank.slot());
let slot = bank.slot();
let parent_slot = {
if let Some(parent_bank) = bank.parent() {
parent_bank.slot()
} else {
0
}
};
self.current_slot_and_parent = Some((slot, parent_slot));
receive_elapsed = Duration::new(0, 0);
}
// 2) Convert entries to blobs + generate coding blobs
let keypair = &cluster_info.read().unwrap().keypair.clone();
let next_shred_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0) as u32;
let keypair = cluster_info.read().unwrap().keypair.clone();
let parent_slot = if let Some(parent_bank) = bank.parent() {
parent_bank.slot()
} else {
0
};
// Create shreds from entries
let to_shreds_start = Instant::now();
let shredder = Shredder::new(
bank.slot(),
parent_slot,
RECOMMENDED_FEC_RATE,
keypair.clone(),
)
.expect("Expected to create a new shredder");
let (data_shreds, coding_shreds, latest_shred_index) = shredder.entries_to_shreds(
// 1) Check if slot was interrupted
let last_unfinished_slot_shred = self.check_for_interrupted_slot();
// 2) Convert entries to shreds and coding shreds
let (data_shreds, coding_shreds) = self.entries_to_shreds(
blocktree,
&receive_results.entries,
keypair,
last_tick == bank.max_tick_height(),
next_shred_index,
);
let to_shreds_elapsed = to_shreds_start.elapsed();
let clone_and_seed_start = Instant::now();
let all_shreds = data_shreds
.iter()
.cloned()
.chain(coding_shreds.iter().cloned())
.collect::<Vec<_>>();
let all_shreds =
Self::coalesce_shreds(data_shreds, coding_shreds, last_unfinished_slot_shred);
let all_shreds_ = all_shreds.clone();
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
let num_shreds = all_shreds.len();
let clone_and_seed_elapsed = clone_and_seed_start.elapsed();
// Insert shreds into blocktree
// 3) Insert shreds into blocktree
let insert_shreds_start = Instant::now();
blocktree
.insert_shreds(all_shreds, None)
.insert_shreds(all_shreds_, None)
.expect("Failed to insert shreds in blocktree");
let insert_shreds_elapsed = insert_shreds_start.elapsed();
// 3) Start broadcast step
// 4) Broadcast the shreds
let broadcast_start = Instant::now();
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let all_shred_bufs: Vec<Vec<u8>> = data_shreds
.into_iter()
.chain(coding_shreds.into_iter())
.map(|s| s.payload)
.collect();
let all_shred_bufs: Vec<Vec<u8>> = all_shreds.into_iter().map(|s| s.payload).collect();
trace!("Broadcasting {:?} shreds", all_shred_bufs.len());
cluster_info.read().unwrap().broadcast_shreds(
@ -212,22 +206,223 @@ impl BroadcastRun for StandardBroadcastRun {
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(
duration_as_ms(&receive_elapsed),
duration_as_ms(&to_shreds_elapsed),
duration_as_ms(&insert_shreds_elapsed),
duration_as_ms(&broadcast_elapsed),
duration_as_ms(&clone_and_seed_elapsed),
duration_as_ms(
&(receive_elapsed + to_shreds_elapsed + insert_shreds_elapsed + broadcast_elapsed),
),
num_entries,
num_shreds,
next_shred_index,
bank.slot(),
duration_as_us(&receive_elapsed),
duration_as_us(&to_shreds_elapsed),
duration_as_us(&insert_shreds_elapsed),
duration_as_us(&broadcast_elapsed),
duration_as_us(&clone_and_seed_elapsed),
last_tick == bank.max_tick_height(),
latest_shred_index,
);
if last_tick == bank.max_tick_height() {
self.unfinished_slot = None;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn update_broadcast_stats(
&mut self,
receive_entries_elapsed: u64,
shredding_elapsed: u64,
insert_shreds_elapsed: u64,
broadcast_elapsed: u64,
clone_and_seed_elapsed: u64,
slot_ended: bool,
) {
self.stats.receive_elapsed += receive_entries_elapsed;
self.stats.shredding_elapsed += shredding_elapsed;
self.stats.insert_shreds_elapsed += insert_shreds_elapsed;
self.stats.broadcast_elapsed += broadcast_elapsed;
self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed;
if slot_ended {
self.report_and_reset_stats()
}
}
fn report_and_reset_stats(&mut self) {
assert!(self.unfinished_slot.is_some());
datapoint_info!(
"broadcast-bank-stats",
("slot", self.unfinished_slot.unwrap().slot as i64, i64),
("shredding_time", self.stats.shredding_elapsed as i64, i64),
(
"insertion_time",
self.stats.insert_shreds_elapsed as i64,
i64
),
("broadcast_time", self.stats.broadcast_elapsed as i64, i64),
("receive_time", self.stats.receive_elapsed as i64, i64),
(
"clone_and_seed",
self.stats.clone_and_seed_elapsed as i64,
i64
),
(
"num_shreds",
i64::from(self.unfinished_slot.unwrap().next_shred_index),
i64
),
(
"slot_broadcast_time",
self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64,
i64
),
);
self.stats.reset();
}
}
impl BroadcastRun for StandardBroadcastRun {
fn run(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntry>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
self.process_receive_results(cluster_info, sock, blocktree, receive_results)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::create_ticks;
use crate::genesis_utils::create_genesis_block;
use crate::shred::max_ticks_per_n_shreds;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::{Arc, RwLock};
use std::time::Duration;
fn setup(
num_shreds_per_slot: u64,
) -> (
Arc<Blocktree>,
GenesisBlock,
Arc<RwLock<ClusterInfo>>,
Arc<Bank>,
Keypair,
UdpSocket,
) {
// Setup
let ledger_path = get_tmp_ledger_path!();
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
leader_info.info.clone(),
)));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut genesis_block = create_genesis_block(10_000).genesis_block;
genesis_block.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1;
let bank0 = Arc::new(Bank::new(&genesis_block));
(
blocktree,
genesis_block,
cluster_info,
bank0,
leader_keypair,
socket,
)
}
#[test]
fn test_slot_interrupt() {
// Setup
let num_shreds_per_slot = 2;
let (blocktree, genesis_block, cluster_info, bank0, leader_keypair, socket) =
setup(num_shreds_per_slot);
// Insert 1 less than the number of ticks needed to finish the slot
let ticks = create_ticks(genesis_block.ticks_per_slot - 1, genesis_block.hash());
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(3, 0),
bank: bank0.clone(),
last_tick: (ticks.len() - 1) as u64,
};
// Step 1: Make an incomplete transmission for slot 0
let mut standard_broadcast_run = StandardBroadcastRun::new();
standard_broadcast_run
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
.unwrap();
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot);
assert_eq!(unfinished_slot.slot, 0);
assert_eq!(unfinished_slot.parent, 0);
// Make sure the slot is not complete
assert!(!blocktree.is_full(0));
// Modify the stats, should reset later
standard_broadcast_run.stats.receive_elapsed = 10;
// Try to fetch ticks from blocktree, nothing should break
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks);
assert_eq!(
blocktree
.get_slot_entries(0, num_shreds_per_slot, None)
.unwrap(),
vec![],
);
// Step 2: Make a transmission for another bank that interrupts the transmission for
// slot 0
let bank2 = Arc::new(Bank::new_from_parent(&bank0, &leader_keypair.pubkey(), 2));
// Interrupting the slot should cause the unfinished_slot and stats to reset
let num_shreds = 1;
assert!(num_shreds < num_shreds_per_slot);
let ticks = create_ticks(max_ticks_per_n_shreds(num_shreds), genesis_block.hash());
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(2, 0),
bank: bank2.clone(),
last_tick: (ticks.len() - 1) as u64,
};
standard_broadcast_run
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
.unwrap();
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
// The shred index should have reset to 0, which makes it possible for the
// index < the previous shred index for slot 0
assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds);
assert_eq!(unfinished_slot.slot, 2);
assert_eq!(unfinished_slot.parent, 0);
// Check that the stats were reset as well
assert_eq!(standard_broadcast_run.stats.receive_elapsed, 0);
}
#[test]
fn test_slot_finish() {
// Setup
let num_shreds_per_slot = 2;
let (blocktree, genesis_block, cluster_info, bank0, _, socket) = setup(num_shreds_per_slot);
// Insert complete slot of ticks needed to finish the slot
let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash());
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(3, 0),
bank: bank0.clone(),
last_tick: (ticks.len() - 1) as u64,
};
let mut standard_broadcast_run = StandardBroadcastRun::new();
standard_broadcast_run
.process_receive_results(&cluster_info, &socket, &blocktree, receive_results)
.unwrap();
assert!(standard_broadcast_run.unfinished_slot.is_none())
}
}

View File

@ -1771,7 +1771,7 @@ mod tests {
use crate::crds_value::CrdsValueLabel;
use crate::repair_service::RepairType;
use crate::result::Error;
use crate::shred::max_ticks_per_shred;
use crate::shred::max_ticks_per_n_shreds;
use crate::shred::{DataShredHeader, Shred};
use crate::test_tx::test_tx;
use solana_sdk::hash::Hash;
@ -1976,7 +1976,7 @@ mod tests {
let _ = fill_blocktree_slot_with_ticks(
&blocktree,
max_ticks_per_shred() + 1,
max_ticks_per_n_shreds(1) + 1,
2,
1,
Hash::default(),

View File

@ -7,6 +7,8 @@
pub mod bank_forks;
pub mod banking_stage;
#[macro_use]
pub mod blocktree;
pub mod broadcast_stage;
pub mod chacha;
pub mod chacha_cuda;
@ -17,20 +19,18 @@ pub mod recycler;
pub mod shred_fetch_stage;
#[macro_use]
pub mod contact_info;
pub mod crds;
pub mod crds_gossip;
pub mod crds_gossip_error;
pub mod crds_gossip_pull;
pub mod crds_gossip_push;
pub mod crds_value;
#[macro_use]
pub mod blocktree;
pub mod blockstream;
pub mod blockstream_service;
pub mod blocktree_processor;
pub mod cluster_info;
pub mod cluster_info_repair_listener;
pub mod consensus;
pub mod crds;
pub mod crds_gossip;
pub mod crds_gossip_error;
pub mod crds_gossip_pull;
pub mod crds_gossip_push;
pub mod crds_value;
pub mod cuda_runtime;
pub mod entry;
pub mod erasure;

View File

@ -406,7 +406,7 @@ mod test {
};
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::cluster_info::Node;
use crate::shred::max_ticks_per_shred;
use crate::shred::max_ticks_per_n_shreds;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
@ -538,7 +538,7 @@ mod test {
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_shred() + 1;
let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {

View File

@ -122,14 +122,23 @@ impl Shred {
index: u32,
parent_offset: u16,
data: Option<&[u8]>,
flags: u8,
is_last_data: bool,
is_last_in_slot: bool,
) -> Self {
let mut shred_buf = vec![0; PACKET_DATA_SIZE];
let mut header = DataShredHeader::default();
header.data_header.slot = slot;
header.data_header.index = index;
header.parent_offset = parent_offset;
header.flags = flags;
header.flags = 0;
if is_last_data {
header.flags |= DATA_COMPLETE_SHRED
}
if is_last_in_slot {
header.flags |= LAST_SHRED_IN_SLOT
}
if let Some(data) = data {
bincode::serialize_into(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER], &header)
@ -345,20 +354,21 @@ impl Shredder {
.map(|(i, shred_data)| {
let shred_index = next_shred_index + i as u32;
let mut header: u8 = 0;
if shred_index == last_shred_index {
header |= DATA_COMPLETE_SHRED;
if is_last_in_slot {
header |= LAST_SHRED_IN_SLOT;
let (is_last_data, is_last_in_slot) = {
if shred_index == last_shred_index {
(true, is_last_in_slot)
} else {
(false, false)
}
}
};
let mut shred = Shred::new_from_data(
self.slot,
shred_index,
(self.slot - self.parent_slot) as u16,
Some(shred_data),
header,
is_last_data,
is_last_in_slot,
);
Shredder::sign_shred(
@ -663,9 +673,9 @@ impl Shredder {
}
}
pub fn max_ticks_per_shred() -> u64 {
pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 {
let ticks = create_ticks(1, Hash::default());
max_entries_per_n_shred(&ticks[0], 1)
max_entries_per_n_shred(&ticks[0], num_shreds)
}
pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 {
@ -848,7 +858,7 @@ pub mod tests {
.expect("Failed in creating shredder");
// Create enough entries to make > 1 shred
let num_entries = max_ticks_per_shred() + 1;
let num_entries = max_ticks_per_n_shreds(1) + 1;
let entries: Vec<_> = (0..num_entries)
.map(|_| {
let keypair0 = Keypair::new();