The getConfirmedBlock RPC API is now disabled by default

The --enable-rpc-get-confirmed-block flag allows validators to opt-in to
the higher disk usage and IOPS.
This commit is contained in:
Michael Vines 2020-02-11 18:01:49 -07:00
parent ad43babe3d
commit c4fd81fc1c
8 changed files with 40 additions and 33 deletions

View File

@ -78,7 +78,7 @@ pub struct ReplayStageConfig {
pub snapshot_package_sender: Option<SnapshotPackageSender>, pub snapshot_package_sender: Option<SnapshotPackageSender>,
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
pub transaction_status_sender: Option<TransactionStatusSender>, pub transaction_status_sender: Option<TransactionStatusSender>,
pub rewards_sender: Option<RewardsRecorderSender>, pub rewards_recorder_sender: Option<RewardsRecorderSender>,
} }
pub struct ReplayStage { pub struct ReplayStage {
@ -181,7 +181,7 @@ impl ReplayStage {
snapshot_package_sender, snapshot_package_sender,
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_sender, rewards_recorder_sender,
} = config; } = config;
let (root_bank_sender, root_bank_receiver) = channel(); let (root_bank_sender, root_bank_receiver) = channel();
@ -222,7 +222,7 @@ impl ReplayStage {
&bank_forks, &bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
&subscriptions, &subscriptions,
rewards_sender.clone(), rewards_recorder_sender.clone(),
); );
datapoint_debug!( datapoint_debug!(
"replay_stage-memory", "replay_stage-memory",
@ -399,7 +399,7 @@ impl ReplayStage {
&poh_recorder, &poh_recorder,
&leader_schedule_cache, &leader_schedule_cache,
&subscriptions, &subscriptions,
rewards_sender.clone(), rewards_recorder_sender.clone(),
); );
if let Some(bank) = poh_recorder.lock().unwrap().bank() { if let Some(bank) = poh_recorder.lock().unwrap().bank() {
@ -473,7 +473,7 @@ impl ReplayStage {
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
rewards_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
) { ) {
// all the individual calls to poh_recorder.lock() are designed to // all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention // increase granularity, decrease contention
@ -539,7 +539,7 @@ impl ReplayStage {
.unwrap() .unwrap()
.insert(Bank::new_from_parent(&parent, my_pubkey, poh_slot)); .insert(Bank::new_from_parent(&parent, my_pubkey, poh_slot));
Self::record_rewards(&tpu_bank, &rewards_sender); Self::record_rewards(&tpu_bank, &rewards_recorder_sender);
poh_recorder.lock().unwrap().set_bank(&tpu_bank); poh_recorder.lock().unwrap().set_bank(&tpu_bank);
} else { } else {
error!("{} No next leader found", my_pubkey); error!("{} No next leader found", my_pubkey);
@ -983,7 +983,7 @@ impl ReplayStage {
forks_lock: &RwLock<BankForks>, forks_lock: &RwLock<BankForks>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
subscriptions: &Arc<RpcSubscriptions>, subscriptions: &Arc<RpcSubscriptions>,
rewards_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
) { ) {
// Find the next slot that chains to the old slot // Find the next slot that chains to the old slot
let forks = forks_lock.read().unwrap(); let forks = forks_lock.read().unwrap();
@ -1021,7 +1021,7 @@ impl ReplayStage {
subscriptions.notify_slot(child_slot, parent_slot, forks.root()); subscriptions.notify_slot(child_slot, parent_slot, forks.root());
let child_bank = Bank::new_from_parent(&parent_bank, &leader, child_slot); let child_bank = Bank::new_from_parent(&parent_bank, &leader, child_slot);
Self::record_rewards(&child_bank, &rewards_sender); Self::record_rewards(&child_bank, &rewards_recorder_sender);
new_banks.insert(child_slot, child_bank); new_banks.insert(child_slot, child_bank);
} }
} }
@ -1033,12 +1033,12 @@ impl ReplayStage {
} }
} }
fn record_rewards(bank: &Bank, rewards_sender: &Option<RewardsRecorderSender>) { fn record_rewards(bank: &Bank, rewards_recorder_sender: &Option<RewardsRecorderSender>) {
if let Some(rewards_sender) = rewards_sender { if let Some(rewards_recorder_sender) = rewards_recorder_sender {
if let Some(ref rewards) = bank.rewards { if let Some(ref rewards) = bank.rewards {
rewards_sender rewards_recorder_sender
.send((bank.slot(), rewards.iter().copied().collect())) .send((bank.slot(), rewards.iter().copied().collect()))
.unwrap_or_else(|err| warn!("rewards_sender failed: {:?}", err)); .unwrap_or_else(|err| warn!("rewards_recorder_sender failed: {:?}", err));
} }
} }
} }

View File

@ -47,7 +47,8 @@ fn new_response<T>(bank: &Bank, value: T) -> RpcResponse<T> {
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct JsonRpcConfig { pub struct JsonRpcConfig {
pub enable_validator_exit: bool, // Enable the 'validatorExit' command pub enable_validator_exit: bool,
pub enable_get_confirmed_block: bool,
pub faucet_addr: Option<SocketAddr>, pub faucet_addr: Option<SocketAddr>,
} }
@ -330,7 +331,11 @@ impl JsonRpcRequestProcessor {
slot: Slot, slot: Slot,
encoding: Option<RpcTransactionEncoding>, encoding: Option<RpcTransactionEncoding>,
) -> Result<Option<RpcConfirmedBlock>> { ) -> Result<Option<RpcConfirmedBlock>> {
if self.config.enable_get_confirmed_block {
Ok(self.blockstore.get_confirmed_block(slot, encoding).ok()) Ok(self.blockstore.get_confirmed_block(slot, encoding).ok())
} else {
Ok(None)
}
} }
pub fn get_confirmed_blocks( pub fn get_confirmed_blocks(
@ -1240,7 +1245,10 @@ pub mod tests {
let _ = bank.process_transaction(&tx); let _ = bank.process_transaction(&tx);
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
JsonRpcConfig::default(), JsonRpcConfig {
enable_get_confirmed_block: true,
..JsonRpcConfig::default()
},
bank_forks.clone(), bank_forks.clone(),
block_commitment_cache.clone(), block_commitment_cache.clone(),
blockstore, blockstore,

View File

@ -87,7 +87,7 @@ impl Tvu {
cfg: Option<Arc<AtomicBool>>, cfg: Option<Arc<AtomicBool>>,
shred_version: u16, shred_version: u16,
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
rewards_sender: Option<RewardsRecorderSender>, rewards_recorder_sender: Option<RewardsRecorderSender>,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
.read() .read()
@ -172,7 +172,7 @@ impl Tvu {
snapshot_package_sender, snapshot_package_sender,
block_commitment_cache, block_commitment_cache,
transaction_status_sender, transaction_status_sender,
rewards_sender, rewards_recorder_sender,
}; };
let (replay_stage, root_bank_receiver) = ReplayStage::new( let (replay_stage, root_bank_receiver) = ReplayStage::new(

View File

@ -61,7 +61,6 @@ pub struct ValidatorConfig {
pub expected_genesis_hash: Option<Hash>, pub expected_genesis_hash: Option<Hash>,
pub expected_shred_version: Option<u16>, pub expected_shred_version: Option<u16>,
pub voting_disabled: bool, pub voting_disabled: bool,
pub transaction_status_service_disabled: bool,
pub blockstream_unix_socket: Option<PathBuf>, pub blockstream_unix_socket: Option<PathBuf>,
pub storage_slots_per_turn: u64, pub storage_slots_per_turn: u64,
pub account_paths: Vec<PathBuf>, pub account_paths: Vec<PathBuf>,
@ -84,7 +83,6 @@ impl Default for ValidatorConfig {
expected_genesis_hash: None, expected_genesis_hash: None,
expected_shred_version: None, expected_shred_version: None,
voting_disabled: false, voting_disabled: false,
transaction_status_service_disabled: false,
blockstream_unix_socket: None, blockstream_unix_socket: None,
storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN, storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN,
max_ledger_slots: None, max_ledger_slots: None,
@ -256,7 +254,7 @@ impl Validator {
}); });
let (transaction_status_sender, transaction_status_service) = let (transaction_status_sender, transaction_status_service) =
if rpc_service.is_some() && !config.transaction_status_service_disabled { if rpc_service.is_some() && config.rpc_config.enable_get_confirmed_block {
let (transaction_status_sender, transaction_status_receiver) = unbounded(); let (transaction_status_sender, transaction_status_receiver) = unbounded();
( (
Some(transaction_status_sender), Some(transaction_status_sender),
@ -270,11 +268,11 @@ impl Validator {
(None, None) (None, None)
}; };
let (rewards_sender, rewards_recorder_service) = let (rewards_recorder_sender, rewards_recorder_service) =
if rpc_service.is_some() && !config.transaction_status_service_disabled { if rpc_service.is_some() && config.rpc_config.enable_get_confirmed_block {
let (rewards_sender, rewards_receiver) = unbounded(); let (rewards_recorder_sender, rewards_receiver) = unbounded();
( (
Some(rewards_sender), Some(rewards_recorder_sender),
Some(RewardsRecorderService::new( Some(RewardsRecorderService::new(
rewards_receiver, rewards_receiver,
blockstore.clone(), blockstore.clone(),
@ -405,7 +403,7 @@ impl Validator {
config.enable_partition.clone(), config.enable_partition.clone(),
node.info.shred_version, node.info.shred_version,
transaction_status_sender.clone(), transaction_status_sender.clone(),
rewards_sender, rewards_recorder_sender,
); );
if config.dev_sigverify_disabled { if config.dev_sigverify_disabled {
@ -647,7 +645,6 @@ pub fn new_validator_for_tests_ex(
let leader_voting_keypair = Arc::new(voting_keypair); let leader_voting_keypair = Arc::new(voting_keypair);
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let config = ValidatorConfig { let config = ValidatorConfig {
transaction_status_service_disabled: true,
rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())), rpc_ports: Some((node.info.rpc.port(), node.info.rpc_pubsub.port())),
..ValidatorConfig::default() ..ValidatorConfig::default()
}; };
@ -748,7 +745,6 @@ mod tests {
let voting_keypair = Arc::new(Keypair::new()); let voting_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let config = ValidatorConfig { let config = ValidatorConfig {
transaction_status_service_disabled: true,
rpc_ports: Some(( rpc_ports: Some((
validator_node.info.rpc.port(), validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(), validator_node.info.rpc_pubsub.port(),
@ -788,7 +784,6 @@ mod tests {
let voting_keypair = Arc::new(Keypair::new()); let voting_keypair = Arc::new(Keypair::new());
let storage_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new());
let config = ValidatorConfig { let config = ValidatorConfig {
transaction_status_service_disabled: true,
rpc_ports: Some(( rpc_ports: Some((
validator_node.info.rpc.port(), validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(), validator_node.info.rpc_pubsub.port(),

View File

@ -211,7 +211,6 @@ impl LocalCluster {
leader_node.info.rpc.port(), leader_node.info.rpc.port(),
leader_node.info.rpc_pubsub.port(), leader_node.info.rpc_pubsub.port(),
)); ));
leader_config.transaction_status_service_disabled = true;
let leader_server = Validator::new( let leader_server = Validator::new(
leader_node, leader_node,
&leader_keypair, &leader_keypair,
@ -359,7 +358,6 @@ impl LocalCluster {
validator_node.info.rpc.port(), validator_node.info.rpc.port(),
validator_node.info.rpc_pubsub.port(), validator_node.info.rpc_pubsub.port(),
)); ));
config.transaction_status_service_disabled = true;
let voting_keypair = Arc::new(voting_keypair); let voting_keypair = Arc::new(voting_keypair);
let validator_server = Validator::new( let validator_server = Validator::new(
validator_node, validator_node,
@ -668,9 +666,6 @@ impl Cluster for LocalCluster {
cluster_validator_info.info.contact_info = node.info.clone(); cluster_validator_info.info.contact_info = node.info.clone();
cluster_validator_info.config.rpc_ports = cluster_validator_info.config.rpc_ports =
Some((node.info.rpc.port(), node.info.rpc_pubsub.port())); Some((node.info.rpc.port(), node.info.rpc_pubsub.port()));
cluster_validator_info
.config
.transaction_status_service_disabled = true;
let entry_point_info = { let entry_point_info = {
if *pubkey == self.entry_point_info.id { if *pubkey == self.entry_point_info.id {

View File

@ -285,6 +285,7 @@ EOF
--blockstream /tmp/solana-blockstream.sock --blockstream /tmp/solana-blockstream.sock
--no-voting --no-voting
--dev-no-sigverify --dev-no-sigverify
--enable-rpc-get-confirmed-block
) )
else else
args+=(--enable-rpc-exit) args+=(--enable-rpc-exit)

1
run.sh
View File

@ -112,6 +112,7 @@ args=(
--rpc-faucet-address 127.0.0.1:9900 --rpc-faucet-address 127.0.0.1:9900
--log - --log -
--enable-rpc-exit --enable-rpc-exit
--enable-rpc-get-confirmed-block
--init-complete-file "$dataDir"/init-completed --init-complete-file "$dataDir"/init-completed
) )
if [[ -n $blockstreamSocket ]]; then if [[ -n $blockstreamSocket ]]; then

View File

@ -507,6 +507,12 @@ pub fn main() {
.takes_value(false) .takes_value(false)
.help("Enable the JSON RPC 'validatorExit' API. Only enable in a debug environment"), .help("Enable the JSON RPC 'validatorExit' API. Only enable in a debug environment"),
) )
.arg(
Arg::with_name("enable_rpc_get_confirmed_block")
.long("enable-rpc-get-confirmed-block")
.takes_value(false)
.help("Enable the JSON RPC 'getConfirmedBlock' API. This will cause an increase in disk usage and IOPS"),
)
.arg( .arg(
Arg::with_name("rpc_faucet_addr") Arg::with_name("rpc_faucet_addr")
.long("rpc-faucet-address") .long("rpc-faucet-address")
@ -684,6 +690,7 @@ pub fn main() {
new_hard_forks: hardforks_of(&matches, "hard_forks"), new_hard_forks: hardforks_of(&matches, "hard_forks"),
rpc_config: JsonRpcConfig { rpc_config: JsonRpcConfig {
enable_validator_exit: matches.is_present("enable_rpc_exit"), enable_validator_exit: matches.is_present("enable_rpc_exit"),
enable_get_confirmed_block: matches.is_present("enable_rpc_get_confirmed_block"),
faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| { faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| {
solana_net_utils::parse_host_port(address).expect("failed to parse faucet address") solana_net_utils::parse_host_port(address).expect("failed to parse faucet address")
}), }),