Remove dependency on Entry::tick_height
This commit is contained in:
parent
77cb70dd80
commit
3d00992c95
|
@ -26,6 +26,7 @@ impl EntryStreamStage {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
ledger_entry_receiver: EntryReceiver,
|
ledger_entry_receiver: EntryReceiver,
|
||||||
entry_stream_socket: String,
|
entry_stream_socket: String,
|
||||||
|
mut tick_height: u64,
|
||||||
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> (Self, EntryReceiver) {
|
) -> (Self, EntryReceiver) {
|
||||||
|
@ -40,6 +41,7 @@ impl EntryStreamStage {
|
||||||
if let Err(e) = Self::process_entries(
|
if let Err(e) = Self::process_entries(
|
||||||
&ledger_entry_receiver,
|
&ledger_entry_receiver,
|
||||||
&entry_stream_sender,
|
&entry_stream_sender,
|
||||||
|
&mut tick_height,
|
||||||
&mut entry_stream,
|
&mut entry_stream,
|
||||||
) {
|
) {
|
||||||
match e {
|
match e {
|
||||||
|
@ -55,6 +57,7 @@ impl EntryStreamStage {
|
||||||
fn process_entries(
|
fn process_entries(
|
||||||
ledger_entry_receiver: &EntryReceiver,
|
ledger_entry_receiver: &EntryReceiver,
|
||||||
entry_stream_sender: &EntrySender,
|
entry_stream_sender: &EntrySender,
|
||||||
|
tick_height: &mut u64,
|
||||||
entry_stream: &mut EntryStream,
|
entry_stream: &mut EntryStream,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timeout = Duration::new(1, 0);
|
let timeout = Duration::new(1, 0);
|
||||||
|
@ -62,7 +65,10 @@ impl EntryStreamStage {
|
||||||
let leader_scheduler = entry_stream.leader_scheduler.read().unwrap();
|
let leader_scheduler = entry_stream.leader_scheduler.read().unwrap();
|
||||||
|
|
||||||
for entry in &entries {
|
for entry in &entries {
|
||||||
let slot = leader_scheduler.tick_height_to_slot(entry.tick_height);
|
if entry.is_tick() {
|
||||||
|
*tick_height += 1
|
||||||
|
}
|
||||||
|
let slot = leader_scheduler.tick_height_to_slot(*tick_height);
|
||||||
let leader_id = leader_scheduler
|
let leader_id = leader_scheduler
|
||||||
.get_leader_for_slot(slot)
|
.get_leader_for_slot(slot)
|
||||||
.map(|leader| leader.to_string())
|
.map(|leader| leader.to_string())
|
||||||
|
@ -85,10 +91,10 @@ impl EntryStreamStage {
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
|
debug!("Entry Stream error: {:?}, {:?}", e, entry_stream.output);
|
||||||
});
|
});
|
||||||
if 0 == leader_scheduler.num_ticks_left_in_slot(entry.tick_height) {
|
if 0 == leader_scheduler.num_ticks_left_in_slot(*tick_height) {
|
||||||
entry_stream.queued_block = Some(EntryStreamBlock {
|
entry_stream.queued_block = Some(EntryStreamBlock {
|
||||||
slot,
|
slot,
|
||||||
tick_height: entry.tick_height,
|
tick_height: *tick_height,
|
||||||
id: entry.id,
|
id: entry.id,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -158,6 +164,7 @@ mod test {
|
||||||
EntryStreamStage::process_entries(
|
EntryStreamStage::process_entries(
|
||||||
&ledger_entry_receiver,
|
&ledger_entry_receiver,
|
||||||
&entry_stream_sender,
|
&entry_stream_sender,
|
||||||
|
&mut 1,
|
||||||
&mut entry_stream,
|
&mut entry_stream,
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
|
@ -139,6 +139,7 @@ impl Tvu {
|
||||||
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
|
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
|
||||||
previous_receiver,
|
previous_receiver,
|
||||||
entry_stream.unwrap().to_string(),
|
entry_stream.unwrap().to_string(),
|
||||||
|
bank.tick_height(),
|
||||||
leader_scheduler,
|
leader_scheduler,
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue