Stop passing blob_index unnecessarily into ReplayStage
This commit is contained in:
parent
bf3d2bd2ec
commit
d25fc7a649
|
@ -239,7 +239,7 @@ impl Fullnode {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Figure which node should generate the next tick
|
// Figure which node should generate the next tick
|
||||||
let (next_leader, next_slot, blob_index) = {
|
let (next_leader, next_slot) = {
|
||||||
let next_tick = bank.tick_height() + 1;
|
let next_tick = bank.tick_height() + 1;
|
||||||
|
|
||||||
let leader_scheduler = leader_scheduler.read().unwrap();
|
let leader_scheduler = leader_scheduler.read().unwrap();
|
||||||
|
@ -248,28 +248,20 @@ impl Fullnode {
|
||||||
let next_leader = leader_scheduler
|
let next_leader = leader_scheduler
|
||||||
.get_leader_for_slot(next_slot)
|
.get_leader_for_slot(next_slot)
|
||||||
.expect("Leader not known after processing bank");
|
.expect("Leader not known after processing bank");
|
||||||
let blob_index = if let Some(meta) = blocktree.meta(next_slot).expect("Database error")
|
|
||||||
{
|
|
||||||
meta.consumed
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"node {:?} scheduled as leader for slot {}, starting blob_index={}",
|
"node {:?} scheduled as leader for slot {}",
|
||||||
next_leader,
|
next_leader,
|
||||||
next_slot,
|
next_slot,
|
||||||
blob_index,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
(next_leader, next_slot, blob_index)
|
(next_leader, next_slot)
|
||||||
};
|
};
|
||||||
// END TODO
|
// END TODO
|
||||||
|
|
||||||
let tvu = Tvu::new(
|
let tvu = Tvu::new(
|
||||||
voting_keypair_option,
|
voting_keypair_option,
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
blob_index,
|
|
||||||
entry_height,
|
entry_height,
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
|
@ -302,7 +294,7 @@ impl Fullnode {
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: This first rotate should come from the Tvu/ReplayStage
|
// TODO: This first rotate should come from the Tvu/ReplayStage
|
||||||
fullnode.rotate(&bank, next_leader, next_slot, blob_index, &last_entry_id);
|
fullnode.rotate(&bank, next_leader, next_slot, &last_entry_id);
|
||||||
inc_new_counter_info!("fullnode-new", 1);
|
inc_new_counter_info!("fullnode-new", 1);
|
||||||
fullnode
|
fullnode
|
||||||
}
|
}
|
||||||
|
@ -312,7 +304,6 @@ impl Fullnode {
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
leader: Pubkey,
|
leader: Pubkey,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
blob_index: u64,
|
|
||||||
last_entry_id: &Hash,
|
last_entry_id: &Hash,
|
||||||
) -> FullnodeReturnType {
|
) -> FullnodeReturnType {
|
||||||
if leader == self.id {
|
if leader == self.id {
|
||||||
|
@ -342,7 +333,6 @@ impl Fullnode {
|
||||||
.expect("Failed to clone broadcast socket"),
|
.expect("Failed to clone broadcast socket"),
|
||||||
self.sigverify_disabled,
|
self.sigverify_disabled,
|
||||||
slot,
|
slot,
|
||||||
blob_index,
|
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
&self.blocktree,
|
&self.blocktree,
|
||||||
&self.leader_scheduler,
|
&self.leader_scheduler,
|
||||||
|
@ -392,7 +382,7 @@ impl Fullnode {
|
||||||
//self.bank_forks.write().set_working_bank_id(bank_id);
|
//self.bank_forks.write().set_working_bank_id(bank_id);
|
||||||
let bank = self.bank_forks.read().unwrap().working_bank();
|
let bank = self.bank_forks.read().unwrap().working_bank();
|
||||||
|
|
||||||
let transition = self.rotate(&bank, leader, slot, 0, &bank.last_id());
|
let transition = self.rotate(&bank, leader, slot, &bank.last_id());
|
||||||
debug!("role transition complete: {:?}", transition);
|
debug!("role transition complete: {:?}", transition);
|
||||||
if let Some(ref rotation_notifier) = rotation_notifier {
|
if let Some(ref rotation_notifier) = rotation_notifier {
|
||||||
rotation_notifier.send((transition, slot)).unwrap();
|
rotation_notifier.send((transition, slot)).unwrap();
|
||||||
|
|
|
@ -188,7 +188,6 @@ impl ReplayStage {
|
||||||
bank: Arc<Bank>,
|
bank: Arc<Bank>,
|
||||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
mut current_blob_index: u64,
|
|
||||||
last_entry_id: Hash,
|
last_entry_id: Hash,
|
||||||
to_leader_sender: &TvuRotationSender,
|
to_leader_sender: &TvuRotationSender,
|
||||||
ledger_signal_receiver: Receiver<bool>,
|
ledger_signal_receiver: Receiver<bool>,
|
||||||
|
@ -201,6 +200,7 @@ impl ReplayStage {
|
||||||
let to_leader_sender = to_leader_sender.clone();
|
let to_leader_sender = to_leader_sender.clone();
|
||||||
let last_entry_id = Arc::new(RwLock::new(last_entry_id));
|
let last_entry_id = Arc::new(RwLock::new(last_entry_id));
|
||||||
let subscriptions_ = subscriptions.clone();
|
let subscriptions_ = subscriptions.clone();
|
||||||
|
|
||||||
let t_replay = Builder::new()
|
let t_replay = Builder::new()
|
||||||
.name("solana-replay-stage".to_string())
|
.name("solana-replay-stage".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
|
@ -222,7 +222,11 @@ impl ReplayStage {
|
||||||
+ leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot),
|
+ leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
let mut current_blob_index = blocktree
|
||||||
|
.meta(current_slot.unwrap())
|
||||||
|
.expect("Database error")
|
||||||
|
.map(|meta| meta.consumed)
|
||||||
|
.unwrap_or(0);
|
||||||
let mut fees = 0;
|
let mut fees = 0;
|
||||||
|
|
||||||
// Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each
|
// Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each
|
||||||
|
@ -464,7 +468,6 @@ mod test {
|
||||||
bank.clone(),
|
bank.clone(),
|
||||||
Arc::new(RwLock::new(cluster_info_me)),
|
Arc::new(RwLock::new(cluster_info_me)),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
meta.consumed,
|
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
&rotation_sender,
|
&rotation_sender,
|
||||||
l_receiver,
|
l_receiver,
|
||||||
|
@ -568,7 +571,6 @@ mod test {
|
||||||
bank.clone(),
|
bank.clone(),
|
||||||
cluster_info_me.clone(),
|
cluster_info_me.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
entry_height,
|
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
&to_leader_sender,
|
&to_leader_sender,
|
||||||
l_receiver,
|
l_receiver,
|
||||||
|
@ -694,7 +696,6 @@ mod test {
|
||||||
bank.clone(),
|
bank.clone(),
|
||||||
cluster_info_me.clone(),
|
cluster_info_me.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
meta.consumed,
|
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
&rotation_tx,
|
&rotation_tx,
|
||||||
l_receiver,
|
l_receiver,
|
||||||
|
|
|
@ -197,7 +197,6 @@ impl Tpu {
|
||||||
broadcast_socket: UdpSocket,
|
broadcast_socket: UdpSocket,
|
||||||
sigverify_disabled: bool,
|
sigverify_disabled: bool,
|
||||||
slot: u64,
|
slot: u64,
|
||||||
blob_index: u64,
|
|
||||||
last_entry_id: &Hash,
|
last_entry_id: &Hash,
|
||||||
blocktree: &Arc<Blocktree>,
|
blocktree: &Arc<Blocktree>,
|
||||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||||
|
@ -225,6 +224,11 @@ impl Tpu {
|
||||||
// TODO: Fix BankingStage/BroadcastService to operate on `slot` directly instead of
|
// TODO: Fix BankingStage/BroadcastService to operate on `slot` directly instead of
|
||||||
// `max_tick_height`
|
// `max_tick_height`
|
||||||
let max_tick_height = (slot + 1) * leader_scheduler.read().unwrap().ticks_per_slot - 1;
|
let max_tick_height = (slot + 1) * leader_scheduler.read().unwrap().ticks_per_slot - 1;
|
||||||
|
let blob_index = blocktree
|
||||||
|
.meta(slot)
|
||||||
|
.expect("Database error")
|
||||||
|
.map(|meta| meta.consumed)
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
let (banking_stage, entry_receiver) = BankingStage::new(
|
let (banking_stage, entry_receiver) = BankingStage::new(
|
||||||
&bank,
|
&bank,
|
||||||
|
|
|
@ -62,7 +62,6 @@ impl Tvu {
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `bank` - The bank state.
|
/// * `bank` - The bank state.
|
||||||
/// * `entry_height` - Initial ledger height
|
/// * `entry_height` - Initial ledger height
|
||||||
/// * `blob_index` - Index of last processed blob
|
|
||||||
/// * `last_entry_id` - Hash of the last entry
|
/// * `last_entry_id` - Hash of the last entry
|
||||||
/// * `cluster_info` - The cluster_info state.
|
/// * `cluster_info` - The cluster_info state.
|
||||||
/// * `sockets` - My fetch, repair, and restransmit sockets
|
/// * `sockets` - My fetch, repair, and restransmit sockets
|
||||||
|
@ -71,7 +70,6 @@ impl Tvu {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
voting_keypair: Option<Arc<VotingKeypair>>,
|
voting_keypair: Option<Arc<VotingKeypair>>,
|
||||||
bank_forks: &Arc<RwLock<BankForks>>,
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
blob_index: u64,
|
|
||||||
entry_height: u64,
|
entry_height: u64,
|
||||||
last_entry_id: Hash,
|
last_entry_id: Hash,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
|
@ -129,7 +127,6 @@ impl Tvu {
|
||||||
bank.clone(),
|
bank.clone(),
|
||||||
cluster_info.clone(),
|
cluster_info.clone(),
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
blob_index,
|
|
||||||
last_entry_id,
|
last_entry_id,
|
||||||
to_leader_sender,
|
to_leader_sender,
|
||||||
ledger_signal_receiver,
|
ledger_signal_receiver,
|
||||||
|
@ -247,7 +244,6 @@ pub mod tests {
|
||||||
Some(Arc::new(voting_keypair)),
|
Some(Arc::new(voting_keypair)),
|
||||||
&Arc::new(RwLock::new(bank_forks)),
|
&Arc::new(RwLock::new(bank_forks)),
|
||||||
0,
|
0,
|
||||||
0,
|
|
||||||
cur_hash,
|
cur_hash,
|
||||||
&cref1,
|
&cref1,
|
||||||
{
|
{
|
||||||
|
|
|
@ -114,7 +114,6 @@ fn test_replay() {
|
||||||
Some(Arc::new(voting_keypair)),
|
Some(Arc::new(voting_keypair)),
|
||||||
&Arc::new(RwLock::new(bank_forks)),
|
&Arc::new(RwLock::new(bank_forks)),
|
||||||
0,
|
0,
|
||||||
0,
|
|
||||||
cur_hash,
|
cur_hash,
|
||||||
&cref1,
|
&cref1,
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue