Add InstalledScheduler for blockstore_processor (#33875)

* Add InstalledScheduler for blockstore_processor

* Reverse if clauses

* Add more comments for process_batches()

* Elaborate comment

* Simplify schedule_transaction_executions type
This commit is contained in:
Ryo Onodera 2023-10-27 21:42:18 +09:00 committed by GitHub
parent d04ad6557d
commit 950ca5ea86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 329 additions and 41 deletions

40
Cargo.lock generated
View File

@ -1699,6 +1699,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "downcast"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "eager"
version = "0.1.0"
@ -1984,6 +1990,12 @@ dependencies = [
"percent-encoding 2.3.0",
]
[[package]]
name = "fragile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "fs-err"
version = "2.9.0"
@ -3137,6 +3149,33 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "mockall"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "modular-bitfield"
version = "0.11.2"
@ -6929,6 +6968,7 @@ dependencies = [
"lz4",
"memmap2",
"memoffset 0.9.0",
"mockall",
"modular-bitfield",
"num-derive 0.4.1",
"num-traits",

View File

@ -243,6 +243,7 @@ memmap2 = "0.5.10"
memoffset = "0.9"
merlin = "3"
min-max-heap = "1.3.0"
mockall = "0.11.4"
modular-bitfield = "0.11.2"
nix = "0.26.4"
num-bigint = "0.4.4"

View File

@ -294,6 +294,70 @@ fn execute_batches_internal(
})
}
// This fn diverts the code-path into two variants. Both must provide exactly the same set of
// validations. For this reason, this fn is deliberately inserted into the code path to be called
// inside process_entries(), so that Bank::prepare_sanitized_batch() has been called on all of
// batches already, while minimizing code duplication (thus divergent behavior risk) at the cost of
// acceptable overhead of meaningless buffering of batches for the scheduler variant.
//
// Also note that the scheduler variant can't implement the batch-level sanitization naively, due
// to the nature of individual tx processing. That's another reason of this particular placement of
// divergent point in the code-path (i.e. not one layer up with its own prepare_sanitized_batch()
// invocation).
fn process_batches(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
batch_execution_timing: &mut BatchExecutionTiming,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
if bank.has_installed_scheduler() {
debug!(
"process_batches()/schedule_batches_for_execution({} batches)",
batches.len()
);
// scheduling always succeeds here without being blocked on actual transaction executions.
// The transaction execution errors will be collected via the blocking fn called
// BankWithScheduler::wait_for_completed_scheduler(), if any.
schedule_batches_for_execution(bank, batches);
Ok(())
} else {
debug!(
"process_batches()/rebatch_and_execute_batches({} batches)",
batches.len()
);
rebatch_and_execute_batches(
bank,
batches,
transaction_status_sender,
replay_vote_sender,
batch_execution_timing,
log_messages_bytes_limit,
prioritization_fee_cache,
)
}
}
fn schedule_batches_for_execution(
bank: &BankWithScheduler,
batches: &[TransactionBatchWithIndexes],
) {
for TransactionBatchWithIndexes {
batch,
transaction_indexes,
} in batches
{
bank.schedule_transaction_executions(
batch
.sanitized_transactions()
.iter()
.zip(transaction_indexes.iter()),
);
}
}
fn rebatch_transactions<'a>(
lock_results: &'a [Result<()>],
bank: &'a Arc<Bank>,
@ -314,7 +378,7 @@ fn rebatch_transactions<'a>(
}
}
fn execute_batches(
fn rebatch_and_execute_batches(
bank: &Arc<Bank>,
batches: &[TransactionBatchWithIndexes],
transaction_status_sender: Option<&TransactionStatusSender>,
@ -488,7 +552,7 @@ fn process_entries(
if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) {
// If it's a tick that will cause a new blockhash to be created,
// execute the group and register the tick
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
@ -541,7 +605,7 @@ fn process_entries(
} else {
// else we have an entry that conflicts with a prior entry
// execute the current queue and try to process this entry again
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
@ -556,7 +620,7 @@ fn process_entries(
}
}
}
execute_batches(
process_batches(
bank,
&batches,
transaction_status_sender,
@ -1856,9 +1920,12 @@ pub mod tests {
rand::{thread_rng, Rng},
solana_entry::entry::{create_ticks, next_entry, next_entry_mut},
solana_program_runtime::declare_process_instruction,
solana_runtime::genesis_utils::{
solana_runtime::{
genesis_utils::{
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
},
installed_scheduler_pool::MockInstalledScheduler,
},
solana_sdk::{
account::{AccountSharedData, WritableAccount},
epoch_schedule::EpochSchedule,
@ -4245,6 +4312,38 @@ pub mod tests {
)
}
fn create_test_transactions(
mint_keypair: &Keypair,
genesis_hash: &Hash,
) -> Vec<SanitizedTransaction> {
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();
vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
mint_keypair,
&pubkey,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
*genesis_hash,
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
*genesis_hash,
)),
]
}
#[test]
fn test_confirm_slot_entries_progress_num_txs_indexes() {
let GenesisConfigInfo {
@ -4368,34 +4467,7 @@ pub mod tests {
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();
let txs = vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&mint_keypair,
&pubkey,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair2,
&pubkey2,
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
genesis_config.hash(),
)),
];
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let batch = bank.prepare_sanitized_batch(&txs);
assert!(batch.needs_unlock());
let transaction_indexes = vec![42, 43, 44];
@ -4424,6 +4496,46 @@ pub mod tests {
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
}
#[test]
fn test_schedule_batches_for_execution() {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut mocked_scheduler = MockInstalledScheduler::new();
mocked_scheduler
.expect_schedule_execution()
.times(txs.len())
.returning(|_| ());
let bank = BankWithScheduler::new(bank, Some(Box::new(mocked_scheduler)));
let batch = bank.prepare_sanitized_batch(&txs);
let batch_with_indexes = TransactionBatchWithIndexes {
batch,
transaction_indexes: (0..txs.len()).collect(),
};
let mut batch_execution_timing = BatchExecutionTiming::default();
let ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
assert!(process_batches(
&bank,
&[batch_with_indexes],
None,
None,
&mut batch_execution_timing,
None,
&ignored_prioritization_fee_cache
)
.is_ok());
}
#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;

View File

@ -1306,6 +1306,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.8.1"
@ -1399,6 +1405,12 @@ dependencies = [
"syn 2.0.38",
]
[[package]]
name = "downcast"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "eager"
version = "0.1.0"
@ -1643,6 +1655,15 @@ dependencies = [
"miniz_oxide",
]
[[package]]
name = "float-cmp"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4"
dependencies = [
"num-traits",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1673,6 +1694,12 @@ dependencies = [
"percent-encoding 2.3.0",
]
[[package]]
name = "fragile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "fs-err"
version = "2.9.0"
@ -2787,6 +2814,33 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "mockall"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "modular-bitfield"
version = "0.11.2"
@ -2866,6 +2920,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]]
name = "num"
version = "0.2.1"
@ -3438,6 +3498,36 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
[[package]]
name = "predicates"
version = "2.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd"
dependencies = [
"difflib",
"float-cmp",
"itertools",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174"
[[package]]
name = "predicates-tree"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf"
dependencies = [
"predicates-core",
"termtree",
]
[[package]]
name = "pretty-hex"
version = "0.3.0"
@ -5579,6 +5669,7 @@ dependencies = [
"lru",
"lz4",
"memmap2",
"mockall",
"modular-bitfield",
"num-derive 0.4.1",
"num-traits",
@ -6984,6 +7075,12 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "termtree"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "test-case"
version = "3.2.1"

View File

@ -32,6 +32,7 @@ log = { workspace = true }
lru = { workspace = true }
lz4 = { workspace = true }
memmap2 = { workspace = true }
mockall = { workspace = true }
modular-bitfield = { workspace = true }
num-derive = { workspace = true }
num-traits = { workspace = true }

View File

@ -1,19 +1,35 @@
//! Currently, there's only one auxiliary type called BankWithScheduler.. This file will be
//! populated by later PRs to align with the filename.
//! Currently, there are only two things: minimal InstalledScheduler trait and an auxiliary type
//! called BankWithScheduler.. This file will be populated by later PRs to align with the filename.
#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
crate::bank::Bank,
log::*,
solana_sdk::transaction::SanitizedTransaction,
std::{
fmt::Debug,
ops::Deref,
sync::{Arc, RwLock},
},
};
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};
// currently dummy type; will be replaced with the introduction of real type by upcoming pr...
pub type DefaultInstalledSchedulerBox = ();
#[cfg_attr(feature = "dev-context-only-utils", automock)]
// suppress false clippy complaints arising from mockall-derive:
// warning: `#[must_use]` has no effect when applied to a struct field
// warning: the following explicit lifetimes could be elided: 'a
#[cfg_attr(
feature = "dev-context-only-utils",
allow(unused_attributes, clippy::needless_lifetimes)
)]
pub trait InstalledScheduler: Send + Sync + Debug + 'static {
fn schedule_execution<'a>(
&'a self,
transaction_with_index: &'a (&'a SanitizedTransaction, usize),
);
}
pub type DefaultInstalledSchedulerBox = Box<dyn InstalledScheduler>;
/// Very thin wrapper around Arc<Bank>
///
@ -40,7 +56,6 @@ pub struct BankWithScheduler {
#[derive(Debug)]
pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
#[allow(dead_code)]
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<DefaultInstalledSchedulerBox>>;
@ -70,6 +85,28 @@ impl BankWithScheduler {
self.inner.bank.clone()
}
pub fn has_installed_scheduler(&self) -> bool {
self.inner.scheduler.read().unwrap().is_some()
}
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...
pub fn schedule_transaction_executions<'a>(
&self,
transactions_with_indexes: impl ExactSizeIterator<Item = (&'a SanitizedTransaction, &'a usize)>,
) {
trace!(
"schedule_transaction_executions(): {} txs",
transactions_with_indexes.len()
);
let scheduler_guard = self.inner.scheduler.read().unwrap();
let scheduler = scheduler_guard.as_ref().unwrap();
for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index));
}
}
pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
}