Spawn threads based on cpu count (#2232)

This commit is contained in:
Sathish 2018-12-21 13:55:45 -08:00 committed by GitHub
parent 41f8764232
commit 1a3387706d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 7 deletions

View File

@ -56,6 +56,7 @@ solana-native-loader = { path = "programs/native/native_loader", version = "0.12
solana-netutil = { path = "netutil", version = "0.12.0" } solana-netutil = { path = "netutil", version = "0.12.0" }
solana-sdk = { path = "sdk", version = "0.12.0" } solana-sdk = { path = "sdk", version = "0.12.0" }
solana-system-program = { path = "programs/native/system", version = "0.12.0" } solana-system-program = { path = "programs/native/system", version = "0.12.0" }
sys-info = "0.5.6"
tokio = "0.1" tokio = "0.1"
tokio-codec = "0.1" tokio-codec = "0.1"
untrusted = "0.6.2" untrusted = "0.6.2"

View File

@ -5,7 +5,7 @@ extern crate test;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use rayon::prelude::*; use rayon::prelude::*;
use solana::bank::Bank; use solana::bank::Bank;
use solana::banking_stage::{BankingStage, NUM_THREADS}; use solana::banking_stage::BankingStage;
use solana::entry::Entry; use solana::entry::Entry;
use solana::mint::Mint; use solana::mint::Mint;
use solana::packet::to_packets_chunked; use solana::packet::to_packets_chunked;
@ -41,7 +41,8 @@ fn check_txs(receiver: &Receiver<Vec<Entry>>, ref_tx_count: usize) {
#[bench] #[bench]
fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) { fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let txes = 1000 * NUM_THREADS; let num_threads = BankingStage::num_threads() as usize;
let txes = 1000 * num_threads;
let mint_total = 1_000_000_000_000; let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total); let mint = Mint::new(mint_total);
@ -119,7 +120,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
if bank.count_valid_ids(&[mint.last_id()]).len() == 0 { if bank.count_valid_ids(&[mint.last_id()]).len() == 0 {
bank.register_tick(&mint.last_id()); bank.register_tick(&mint.last_id());
} }
for v in verified.chunks(verified.len() / NUM_THREADS) { for v in verified.chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver, txes); check_txs(&signal_receiver, txes);
@ -130,7 +131,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
#[bench] #[bench]
fn bench_banking_stage_multi_programs(bencher: &mut Bencher) { fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let progs = 4; let progs = 4;
let txes = 1000 * NUM_THREADS; let num_threads = BankingStage::num_threads() as usize;
let txes = 1000 * num_threads;
let mint_total = 1_000_000_000_000; let mint_total = 1_000_000_000_000;
let mint = Mint::new(mint_total); let mint = Mint::new(mint_total);
@ -223,7 +225,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
if bank.count_valid_ids(&[mint.last_id()]).len() == 0 { if bank.count_valid_ids(&[mint.last_id()]).len() == 0 {
bank.register_tick(&mint.last_id()); bank.register_tick(&mint.last_id());
} }
for v in verified.chunks(verified.len() / NUM_THREADS) { for v in verified.chunks(verified.len() / num_threads) {
verified_sender.send(v.to_vec()).unwrap(); verified_sender.send(v.to_vec()).unwrap();
} }
check_txs(&signal_receiver, txes); check_txs(&signal_receiver, txes);

View File

@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex};
use std::thread::{self, Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use sys_info;
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum BankingStageReturnType { pub enum BankingStageReturnType {
@ -33,7 +34,7 @@ pub enum BankingStageReturnType {
} }
// number of threads is 1 until mt bank is ready // number of threads is 1 until mt bank is ready
pub const NUM_THREADS: usize = 10; pub const NUM_THREADS: u32 = 10;
/// Stores the stage's thread handle and output receiver. /// Stores the stage's thread handle and output receiver.
pub struct BankingStage { pub struct BankingStage {
@ -72,7 +73,8 @@ impl BankingStage {
); );
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0..NUM_THREADS) let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0
..Self::num_threads())
.map(|_| { .map(|_| {
let thread_bank = bank.clone(); let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone(); let thread_verified_receiver = shared_verified_receiver.clone();
@ -126,6 +128,10 @@ impl BankingStage {
) )
} }
pub fn num_threads() -> u32 {
sys_info::cpu_num().unwrap_or(NUM_THREADS)
}
/// Convert the transactions from a blob of binary data to a vector of transactions and /// Convert the transactions from a blob of binary data to a vector of transactions and
/// an unused `SocketAddr` that could be used to send a response. /// an unused `SocketAddr` that could be used to send a response.
fn deserialize_transactions(p: &Packets) -> Vec<Option<(Transaction, SocketAddr)>> { fn deserialize_transactions(p: &Packets) -> Vec<Option<(Transaction, SocketAddr)>> {