Remove injection of exit signal into cost_update_service

This commit is contained in:
Tao Zhu 2022-03-14 10:31:14 -05:00 committed by Tao Zhu
parent eb73dacd58
commit 61cead9b9b
2 changed files with 43 additions and 67 deletions

View File

@ -11,12 +11,8 @@ use {
solana_runtime::{bank::Bank, cost_model::CostModel}, solana_runtime::{bank::Bank, cost_model::CostModel},
solana_sdk::timing::timestamp, solana_sdk::timing::timestamp,
std::{ std::{
sync::{ sync::{Arc, RwLock},
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration,
}, },
}; };
@ -74,7 +70,6 @@ pub struct CostUpdateService {
impl CostUpdateService { impl CostUpdateService {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cost_model: Arc<RwLock<CostModel>>, cost_model: Arc<RwLock<CostModel>>,
cost_update_receiver: CostUpdateReceiver, cost_update_receiver: CostUpdateReceiver,
@ -82,7 +77,7 @@ impl CostUpdateService {
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-cost-update-service".to_string()) .name("solana-cost-update-service".to_string())
.spawn(move || { .spawn(move || {
Self::service_loop(exit, blockstore, cost_model, cost_update_receiver); Self::service_loop(blockstore, cost_model, cost_update_receiver);
}) })
.unwrap(); .unwrap();
@ -94,77 +89,62 @@ impl CostUpdateService {
} }
fn service_loop( fn service_loop(
exit: Arc<AtomicBool>,
_blockstore: Arc<Blockstore>, _blockstore: Arc<Blockstore>,
cost_model: Arc<RwLock<CostModel>>, cost_model: Arc<RwLock<CostModel>>,
cost_update_receiver: CostUpdateReceiver, cost_update_receiver: CostUpdateReceiver,
) { ) {
let mut cost_update_service_timing = CostUpdateServiceTiming::default(); let mut cost_update_service_timing = CostUpdateServiceTiming::default();
let mut update_count: u64; for cost_update in cost_update_receiver.iter() {
let wait_timer = Duration::from_millis(100); match cost_update {
CostUpdate::FrozenBank { bank } => {
loop { bank.read_cost_tracker().unwrap().report_stats(bank.slot());
if exit.load(Ordering::Relaxed) { }
break; CostUpdate::ExecuteTiming {
} mut execute_timings,
} => {
update_count = 0_u64; let (update_count, update_cost_model_time) = Measure::this(
let mut update_cost_model_time = Measure::start("update_cost_model_time"); |_| Self::update_cost_model(&cost_model, &mut execute_timings),
for cost_update in cost_update_receiver.try_iter() { (),
match cost_update { "update_cost_model_time",
CostUpdate::FrozenBank { bank } => { );
bank.read_cost_tracker().unwrap().report_stats(bank.slot()); cost_update_service_timing.update(update_count, update_cost_model_time.as_us());
}
CostUpdate::ExecuteTiming {
mut execute_timings,
} => {
Self::update_cost_model(&cost_model, &mut execute_timings);
update_count += 1;
}
} }
} }
update_cost_model_time.stop();
cost_update_service_timing.update(update_count, update_cost_model_time.as_us());
thread::sleep(wait_timer);
} }
} }
fn update_cost_model( fn update_cost_model(
cost_model: &RwLock<CostModel>, cost_model: &RwLock<CostModel>,
execute_timings: &mut ExecuteTimings, execute_timings: &mut ExecuteTimings,
) -> bool { ) -> u64 {
let mut dirty = false; let mut update_count = 0_u64;
{ for (program_id, program_timings) in &mut execute_timings.details.per_program_timings {
for (program_id, program_timings) in &mut execute_timings.details.per_program_timings { let current_estimated_program_cost =
let current_estimated_program_cost = cost_model.read().unwrap().find_instruction_cost(program_id);
cost_model.read().unwrap().find_instruction_cost(program_id); program_timings.coalesce_error_timings(current_estimated_program_cost);
program_timings.coalesce_error_timings(current_estimated_program_cost);
if program_timings.count < 1 { if program_timings.count < 1 {
continue; continue;
}
let units = program_timings.accumulated_units / program_timings.count as u64;
match cost_model
.write()
.unwrap()
.upsert_instruction_cost(program_id, units)
{
Ok(c) => {
debug!(
"after replayed into bank, instruction {:?} has averaged cost {}",
program_id, c
);
update_count += 1;
} }
Err(err) => {
let units = program_timings.accumulated_units / program_timings.count as u64; debug!(
match cost_model
.write()
.unwrap()
.upsert_instruction_cost(program_id, units)
{
Ok(c) => {
debug!(
"after replayed into bank, instruction {:?} has averaged cost {}",
program_id, c
);
dirty = true;
}
Err(err) => {
debug!(
"after replayed into bank, instruction {:?} failed to update cost, err: {}", "after replayed into bank, instruction {:?} failed to update cost, err: {}",
program_id, err program_id, err
); );
}
} }
} }
} }
@ -172,7 +152,7 @@ impl CostUpdateService {
"after replayed into bank, updated cost model instruction cost table, current values: {:?}", "after replayed into bank, updated cost model instruction cost table, current values: {:?}",
cost_model.read().unwrap().get_instruction_cost_table() cost_model.read().unwrap().get_instruction_cost_table()
); );
dirty update_count
} }
} }

View File

@ -309,12 +309,8 @@ impl Tvu {
); );
let (cost_update_sender, cost_update_receiver) = unbounded(); let (cost_update_sender, cost_update_receiver) = unbounded();
let cost_update_service = CostUpdateService::new( let cost_update_service =
exit.clone(), CostUpdateService::new(blockstore.clone(), cost_model.clone(), cost_update_receiver);
blockstore.clone(),
cost_model.clone(),
cost_update_receiver,
);
let (drop_bank_sender, drop_bank_receiver) = unbounded(); let (drop_bank_sender, drop_bank_receiver) = unbounded();