Add more slot update notifications (#15734)

* Add more slot update notifications

* fix merge

* Address feedback and add integration test

* switch to datapoint

* remove unused shred method

* fix clippy

* new thread for rpc completed slots

* remove extra constant

* fixes

* rely on channel closing

* fix check
This commit is contained in:
Justin Starry 2021-03-12 21:44:06 +08:00 committed by GitHub
parent 28c27893b9
commit 918d04e3f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 340 additions and 98 deletions

View File

@ -101,13 +101,63 @@ pub struct SlotInfo {
pub root: Slot,
}
#[derive(Serialize, Deserialize, Clone, Copy, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct SlotTransactionStats {
pub num_transaction_entries: u64,
pub num_successful_transactions: u64,
pub num_failed_transactions: u64,
pub max_transactions_per_entry: u64,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum SlotUpdate {
OptimisticConfirmation { slot: Slot, timestamp: u64 },
FirstShredReceived { slot: Slot, timestamp: u64 },
Frozen { slot: Slot, timestamp: u64 },
Root { slot: Slot, timestamp: u64 },
FirstShredReceived {
slot: Slot,
timestamp: u64,
},
Completed {
slot: Slot,
timestamp: u64,
},
CreatedBank {
slot: Slot,
parent: Slot,
timestamp: u64,
},
Frozen {
slot: Slot,
timestamp: u64,
stats: SlotTransactionStats,
},
Dead {
slot: Slot,
timestamp: u64,
err: String,
},
OptimisticConfirmation {
slot: Slot,
timestamp: u64,
},
Root {
slot: Slot,
timestamp: u64,
},
}
impl SlotUpdate {
pub fn slot(&self) -> Slot {
match self {
Self::FirstShredReceived { slot, .. } => *slot,
Self::Completed { slot, .. } => *slot,
Self::CreatedBank { slot, .. } => *slot,
Self::Frozen { slot, .. } => *slot,
Self::Dead { slot, .. } => *slot,
Self::OptimisticConfirmation { slot, .. } => *slot,
Self::Root { slot, .. } => *slot,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]

View File

@ -6,9 +6,9 @@ use solana_sdk::{clock::Slot, pubkey::Pubkey};
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError,
{Arc, RwLock},
},
thread::sleep,
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
@ -79,6 +79,14 @@ impl ClusterSlotsService {
if exit.load(Ordering::Relaxed) {
break;
}
let slots = match completed_slots_receiver.recv_timeout(Duration::from_millis(200)) {
Ok(slots) => Some(slots),
Err(RecvTimeoutError::Timeout) => None,
Err(RecvTimeoutError::Disconnected) => {
warn!("Cluster slots service - sender disconnected");
break;
}
};
let new_root = bank_forks.read().unwrap().root();
let id = cluster_info.id();
let mut lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
@ -87,7 +95,9 @@ impl ClusterSlotsService {
lowest_slot_elapsed.stop();
let mut update_completed_slots_elapsed =
Measure::start("update_completed_slots_elapsed");
Self::update_completed_slots(&completed_slots_receiver, &cluster_info);
if let Some(slots) = slots {
Self::update_completed_slots(slots, &completed_slots_receiver, &cluster_info);
}
cluster_slots.update(new_root, &cluster_info, &bank_forks);
update_completed_slots_elapsed.stop();
@ -113,20 +123,20 @@ impl ClusterSlotsService {
cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
last_stats = Instant::now();
}
sleep(Duration::from_millis(200));
}
}
fn update_completed_slots(
mut slots: Vec<Slot>,
completed_slots_receiver: &CompletedSlotsReceiver,
cluster_info: &ClusterInfo,
) {
let mut slots: Vec<Slot> = vec![];
while let Ok(mut more) = completed_slots_receiver.try_recv() {
slots.append(&mut more);
}
#[allow(clippy::stable_sort_primitive)]
slots.sort();
if !slots.is_empty() {
cluster_info.push_epoch_slots(&slots);
}

View File

@ -58,6 +58,7 @@ mod result;
pub mod retransmit_stage;
pub mod rewards_recorder_service;
pub mod rpc;
pub mod rpc_completed_slots_service;
pub mod rpc_health;
pub mod rpc_pubsub;
pub mod rpc_pubsub_service;

View File

@ -4,7 +4,7 @@
use crate::rpc_subscriptions::RpcSubscriptions;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_client::rpc_response::SlotUpdate;
use solana_client::rpc_response::{SlotTransactionStats, SlotUpdate};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, timing::timestamp};
use std::{
@ -140,6 +140,22 @@ impl OptimisticallyConfirmedBankTracker {
}
BankNotification::Frozen(bank) => {
let frozen_slot = bank.slot();
if let Some(parent) = bank.parent() {
let num_successful_transactions = bank
.transaction_count()
.saturating_sub(parent.transaction_count());
subscriptions.notify_slot_update(SlotUpdate::Frozen {
slot: frozen_slot,
timestamp: timestamp(),
stats: SlotTransactionStats {
num_transaction_entries: bank.transaction_entries_count(),
num_successful_transactions,
num_failed_transactions: bank.transaction_error_count(),
max_transactions_per_entry: bank.transactions_per_entry_max(),
},
});
}
if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
@ -149,10 +165,6 @@ impl OptimisticallyConfirmedBankTracker {
}
drop(w_optimistically_confirmed_bank);
}
subscriptions.notify_slot_update(SlotUpdate::Frozen {
slot: frozen_slot,
timestamp: timestamp(),
});
}
BankNotification::Root(bank) => {
let root_slot = bank.slot();

View File

@ -18,6 +18,7 @@ use crate::{
rewards_recorder_service::RewardsRecorderSender,
rpc_subscriptions::RpcSubscriptions,
};
use solana_client::rpc_response::SlotUpdate;
use solana_ledger::{
block_error::BlockError,
blockstore::Blockstore,
@ -331,6 +332,7 @@ impl ReplayStage {
&replay_vote_sender,
&bank_notification_sender,
&rewards_recorder_sender,
&subscriptions,
);
replay_active_banks_time.stop();
Self::report_memory(&allocated, "replay_active_banks", start);
@ -997,6 +999,7 @@ impl ReplayStage {
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
subscriptions: &Arc<RpcSubscriptions>,
) -> result::Result<usize, BlockstoreProcessorError> {
let tx_count_before = bank_progress.replay_progress.num_txs;
let confirm_result = blockstore_processor::confirm_slot(
@ -1032,7 +1035,15 @@ impl ReplayStage {
} else {
info!("Slot had too few ticks: {}", slot);
}
Self::mark_dead_slot(blockstore, bank_progress, slot, &err, is_serious);
Self::mark_dead_slot(
blockstore,
bank_progress,
slot,
&err,
is_serious,
subscriptions,
);
err
})?;
@ -1045,6 +1056,7 @@ impl ReplayStage {
slot: Slot,
err: &BlockstoreProcessorError,
is_serious: bool,
subscriptions: &Arc<RpcSubscriptions>,
) {
if is_serious {
datapoint_error!(
@ -1063,6 +1075,11 @@ impl ReplayStage {
blockstore
.set_dead_slot(slot)
.expect("Failed to mark slot as dead in blockstore");
subscriptions.notify_slot_update(SlotUpdate::Dead {
slot,
err: format!("error: {:?}", err),
timestamp: timestamp(),
});
}
#[allow(clippy::too_many_arguments)]
@ -1313,6 +1330,7 @@ impl ReplayStage {
replay_vote_sender: &ReplayVoteSender,
bank_notification_sender: &Option<BankNotificationSender>,
rewards_recorder_sender: &Option<RewardsRecorderSender>,
subscriptions: &Arc<RpcSubscriptions>,
) -> bool {
let mut did_complete_bank = false;
let mut tx_count = 0;
@ -1360,6 +1378,7 @@ impl ReplayStage {
transaction_status_sender.clone(),
replay_vote_sender,
verify_recyclers,
subscriptions,
);
match replay_result {
Ok(replay_tx_count) => tx_count += replay_tx_count,
@ -1400,6 +1419,7 @@ impl ReplayStage {
bank.slot(),
&BlockstoreProcessorError::InvalidBlock(BlockError::DuplicateBlock),
true,
subscriptions,
);
warn!(
"{} duplicate shreds detected, not freezing bank {}",
@ -2541,7 +2561,8 @@ pub(crate) mod tests {
..
} = create_genesis_config(1000);
genesis_config.poh_config.hashes_per_tick = Some(2);
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank_forks = BankForks::new(Bank::new(&genesis_config));
let bank0 = bank_forks.working_bank();
let mut progress = ProgressMap::default();
let last_blockhash = bank0.last_blockhash();
let mut bank0_progress = progress
@ -2549,6 +2570,9 @@ pub(crate) mod tests {
.or_insert_with(|| ForkProgress::new(last_blockhash, None, None, 0, 0));
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
blockstore.insert_shreds(shreds, None, false).unwrap();
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let bank_forks = Arc::new(RwLock::new(bank_forks));
let exit = Arc::new(AtomicBool::new(false));
let res = ReplayStage::replay_blockstore_into_bank(
&bank0,
&blockstore,
@ -2556,6 +2580,12 @@ pub(crate) mod tests {
None,
&replay_vote_sender,
&&VerifyRecyclers::default(),
&Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
block_commitment_cache,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)),
);
// Check that the erroring bank was marked as dead in the progress map

View File

@ -12,6 +12,7 @@ use crate::{
repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo,
result::{Error, Result},
rpc_completed_slots_service::RpcCompletedSlotsService,
rpc_subscriptions::RpcSubscriptions,
window_service::{should_retransmit_and_persist, WindowService},
};
@ -219,7 +220,7 @@ pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
pub type ShredFilterAndHasher = (ShredFilter, PacketHasher);
// Returns None if shred is already received and should skip retransmit.
// Otherwise returns shred's slot.
// Otherwise returns shred's slot and whether the shred is a data shred.
fn check_if_already_received(
packet: &Packet,
shreds_received: &Mutex<ShredFilterAndHasher>,
@ -246,40 +247,29 @@ fn check_if_already_received(
}
}
fn notify_first_shred_received(
// Returns true if this is the first time receiving a shred for `shred_slot`.
fn check_if_first_shred_received(
shred_slot: Slot,
rpc_subscriptions: &RpcSubscriptions,
sent_received_slot_notification: &Mutex<BTreeSet<Slot>>,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
root_bank: &Bank,
) {
let notify_slot = {
let mut sent_received_slot_notification_locked =
sent_received_slot_notification.lock().unwrap();
if !sent_received_slot_notification_locked.contains(&shred_slot)
&& shred_slot > root_bank.slot()
{
sent_received_slot_notification_locked.insert(shred_slot);
if sent_received_slot_notification_locked.len() > 100 {
let mut slots_before_root =
sent_received_slot_notification_locked.split_off(&(root_bank.slot() + 1));
// `slots_before_root` now contains all slots <= root
std::mem::swap(
&mut slots_before_root,
&mut sent_received_slot_notification_locked,
);
}
Some(shred_slot)
} else {
None
}
};
) -> bool {
if shred_slot <= root_bank.slot() {
return false;
}
if let Some(slot) = notify_slot {
info!("First time receiving a shred from slot: {}", slot);
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot,
timestamp: timestamp(),
});
let mut first_shreds_received_locked = first_shreds_received.lock().unwrap();
if !first_shreds_received_locked.contains(&shred_slot) {
datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64));
first_shreds_received_locked.insert(shred_slot);
if first_shreds_received_locked.len() > 100 {
let mut slots_before_root =
first_shreds_received_locked.split_off(&(root_bank.slot() + 1));
// `slots_before_root` now contains all slots <= root
std::mem::swap(&mut slots_before_root, &mut first_shreds_received_locked);
}
true
} else {
false
}
}
@ -312,7 +302,7 @@ fn retransmit(
last_peer_update: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
sent_received_slot_notification: &Mutex<BTreeSet<Slot>>,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: &Option<Arc<RpcSubscriptions>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
@ -391,12 +381,12 @@ fn retransmit(
max_slot = max_slot.max(shred_slot);
if let Some(rpc_subscriptions) = rpc_subscriptions {
notify_first_shred_received(
shred_slot,
rpc_subscriptions,
sent_received_slot_notification,
&root_bank,
);
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot: shred_slot,
timestamp: timestamp(),
});
}
}
let mut compute_turbine_peers = Measure::start("turbine_start");
@ -504,7 +494,7 @@ pub fn retransmitter(
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let sent_received_slot_notification = Arc::new(Mutex::new(BTreeSet::new()));
let first_shreds_received = Arc::new(Mutex::new(BTreeSet::new()));
(0..sockets.len())
.map(|s| {
let sockets = sockets.clone();
@ -517,7 +507,7 @@ pub fn retransmitter(
let last_peer_update = Arc::new(AtomicU64::new(0));
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let sent_received_slot_notification = sent_received_slot_notification.clone();
let first_shreds_received = first_shreds_received.clone();
let rpc_subscriptions = rpc_subscriptions.clone();
Builder::new()
@ -537,7 +527,7 @@ pub fn retransmitter(
&last_peer_update,
&shreds_received,
&max_slots,
&sent_received_slot_notification,
&first_shreds_received,
&rpc_subscriptions,
) {
match e {
@ -574,7 +564,7 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<Packets>>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
completed_slots_receivers: [CompletedSlotsReceiver; 2],
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
shred_version: u16,
@ -596,18 +586,23 @@ impl RetransmitStage {
cluster_info.clone(),
retransmit_receiver,
max_slots,
rpc_subscriptions,
rpc_subscriptions.clone(),
);
let leader_schedule_cache_clone = leader_schedule_cache.clone();
let [rpc_completed_slots_receiver, cluster_completed_slots_receiver] =
completed_slots_receivers;
let rpc_completed_slots_hdl =
RpcCompletedSlotsService::spawn(rpc_completed_slots_receiver, rpc_subscriptions);
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_slots.clone(),
bank_forks.clone(),
cluster_info.clone(),
completed_slots_receiver,
cluster_completed_slots_receiver,
exit.clone(),
);
let leader_schedule_cache_clone = leader_schedule_cache.clone();
let repair_info = RepairInfo {
bank_forks,
epoch_schedule,
@ -643,7 +638,11 @@ impl RetransmitStage {
completed_data_sets_sender,
);
let thread_hdls = t_retransmit;
let mut thread_hdls = t_retransmit;
if let Some(thread_hdl) = rpc_completed_slots_hdl {
thread_hdls.push(thread_hdl);
}
Self {
thread_hdls,
window_service,

View File

@ -0,0 +1,33 @@
use crate::rpc_subscriptions::RpcSubscriptions;
use solana_client::rpc_response::SlotUpdate;
use solana_ledger::blockstore::CompletedSlotsReceiver;
use solana_sdk::timing::timestamp;
use std::{
sync::Arc,
thread::{Builder, JoinHandle},
};
pub struct RpcCompletedSlotsService;
impl RpcCompletedSlotsService {
pub fn spawn(
completed_slots_receiver: CompletedSlotsReceiver,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Option<JoinHandle<()>> {
let rpc_subscriptions = rpc_subscriptions?;
Some(
Builder::new()
.name("solana-rpc-completed-slots-service".to_string())
.spawn(move || {
for slots in completed_slots_receiver.iter() {
for slot in slots {
rpc_subscriptions.notify_slot_update(SlotUpdate::Completed {
slot,
timestamp: timestamp(),
});
}
}
})
.unwrap(),
)
}
}

View File

@ -952,6 +952,11 @@ impl RpcSubscriptions {
pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::CreatedBank {
slot,
parent,
timestamp: timestamp(),
}));
}
pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {

View File

@ -2,13 +2,9 @@
use crate::sigverify;
use crate::sigverify_stage::SigVerifier;
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::shred::{OFFSET_OF_SHRED_SLOT, SIZE_OF_SHRED_SLOT};
use solana_ledger::shred::Shred;
use solana_ledger::sigverify_shreds::verify_shreds_gpu;
use solana_perf::{
self,
packet::{limited_deserialize, Packets},
recycler_cache::RecyclerCache,
};
use solana_perf::{self, packet::Packets, recycler_cache::RecyclerCache};
use solana_runtime::bank_forks::BankForks;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
@ -38,18 +34,7 @@ impl ShredSigVerifier {
fn read_slots(batches: &[Packets]) -> HashSet<u64> {
batches
.iter()
.flat_map(|batch| {
batch.packets.iter().filter_map(|packet| {
let slot_start = OFFSET_OF_SHRED_SLOT;
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
trace!("slot {} {}", slot_start, slot_end,);
if slot_end <= packet.meta.size {
limited_deserialize(&packet.data[slot_start..slot_end]).ok()
} else {
None
}
})
})
.flat_map(|batch| batch.packets.iter().filter_map(Shred::get_slot_from_packet))
.collect()
}
}

View File

@ -107,7 +107,7 @@ impl Tvu {
tower: Tower,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
completed_slots_receivers: [CompletedSlotsReceiver; 2],
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
cfg: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
@ -168,7 +168,7 @@ impl Tvu {
repair_socket,
verified_receiver,
&exit,
completed_slots_receiver,
completed_slots_receivers,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
tvu_config.shred_version,
@ -356,7 +356,7 @@ pub mod tests {
let BlockstoreSignals {
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
completed_slots_receivers,
..
} = Blockstore::open_with_signal(&blockstore_path, None, true)
.expect("Expected to successfully open ledger");
@ -398,7 +398,7 @@ pub mod tests {
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
completed_slots_receivers,
block_commitment_cache,
None,
None,

View File

@ -376,7 +376,7 @@ impl Validator {
bank_forks,
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
completed_slots_receivers,
leader_schedule_cache,
snapshot_hash,
TransactionHistoryServices {
@ -694,7 +694,7 @@ impl Validator {
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
completed_slots_receivers,
block_commitment_cache,
config.enable_partition.clone(),
transaction_status_sender.clone(),
@ -1010,7 +1010,7 @@ fn new_banks_from_ledger(
BankForks,
Arc<Blockstore>,
Receiver<bool>,
CompletedSlotsReceiver,
[CompletedSlotsReceiver; 2],
LeaderScheduleCache,
Option<(Slot, Hash)>,
TransactionHistoryServices,
@ -1041,7 +1041,7 @@ fn new_banks_from_ledger(
let BlockstoreSignals {
mut blockstore,
ledger_signal_receiver,
completed_slots_receiver,
completed_slots_receivers,
..
} = Blockstore::open_with_signal(
ledger_path,
@ -1165,7 +1165,7 @@ fn new_banks_from_ledger(
bank_forks,
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
completed_slots_receivers,
leader_schedule_cache,
snapshot_hash,
transaction_history_services,

View File

@ -7,12 +7,13 @@ use serde_json::{json, Value};
use solana_account_decoder::UiAccount;
use solana_client::{
rpc_client::RpcClient,
rpc_response::{Response, RpcSignatureResult},
rpc_response::{Response, RpcSignatureResult, SlotUpdate},
};
use solana_core::{rpc_pubsub::gen_client::Client as PubsubClient, test_validator::TestValidator};
use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
transaction::Transaction,
@ -20,7 +21,7 @@ use solana_sdk::{
use std::{
collections::HashSet,
net::UdpSocket,
sync::mpsc::channel,
sync::{mpsc::channel, Arc},
thread::sleep,
time::{Duration, Instant},
};
@ -140,6 +141,70 @@ fn test_rpc_invalid_requests() {
assert!(the_value.is_null());
}
#[test]
fn test_rpc_slot_updates() {
solana_logger::setup();
let test_validator = TestValidator::with_no_fees(Pubkey::new_unique());
// Create the pub sub runtime
let rt = Runtime::new().unwrap();
let rpc_pubsub_url = test_validator.rpc_pubsub_url();
let (update_sender, update_receiver) = channel::<Arc<SlotUpdate>>();
// Subscribe to slot updates
rt.spawn(async move {
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
let client = connect.await.unwrap();
tokio_02::spawn(async move {
let mut update_sub = client.slots_updates_subscribe().unwrap();
loop {
let response = update_sub.next().await.unwrap();
update_sender.send(response.unwrap()).unwrap();
}
});
});
let first_update = update_receiver
.recv_timeout(Duration::from_secs(2))
.unwrap();
// Verify that updates are received in order for an upcoming slot
let verify_slot = first_update.slot() + 2;
let mut expected_update_index = 0;
let expected_updates = vec![
"CreatedBank",
"Completed",
"Frozen",
"OptimisticConfirmation",
"Root",
];
let test_start = Instant::now();
loop {
assert!(test_start.elapsed() < Duration::from_secs(30));
let update = update_receiver
.recv_timeout(Duration::from_secs(2))
.unwrap();
if update.slot() == verify_slot {
let update_name = match *update {
SlotUpdate::CreatedBank { .. } => "CreatedBank",
SlotUpdate::Completed { .. } => "Completed",
SlotUpdate::Frozen { .. } => "Frozen",
SlotUpdate::OptimisticConfirmation { .. } => "OptimisticConfirmation",
SlotUpdate::Root { .. } => "Root",
_ => continue,
};
assert_eq!(update_name, expected_updates[expected_update_index]);
expected_update_index += 1;
if expected_update_index == expected_updates.len() {
break;
}
}
}
}
#[test]
fn test_rpc_subscriptions() {
solana_logger::setup();

View File

@ -114,7 +114,7 @@ pub struct CompletedDataSetInfo {
pub struct BlockstoreSignals {
pub blockstore: Blockstore,
pub ledger_signal_receiver: Receiver<bool>,
pub completed_slots_receiver: CompletedSlotsReceiver,
pub completed_slots_receivers: [CompletedSlotsReceiver; 2],
}
// ledger window
@ -378,15 +378,18 @@ impl Blockstore {
enforce_ulimit_nofile,
)?;
let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
let (completed_slots_sender1, completed_slots_receiver1) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
let (completed_slots_sender2, completed_slots_receiver2) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.new_shreds_signals = vec![ledger_signal_sender];
blockstore.completed_slots_senders = vec![completed_slots_sender];
blockstore.completed_slots_senders = vec![completed_slots_sender1, completed_slots_sender2];
Ok(BlockstoreSignals {
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
completed_slots_receivers: [completed_slots_receiver1, completed_slots_receiver2],
})
}
@ -4302,7 +4305,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
completed_slots_receivers: [recvr, _],
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);
@ -4328,7 +4331,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
completed_slots_receivers: [recvr, _],
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);
@ -4372,7 +4375,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
completed_slots_receivers: [recvr, _],
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);

View File

@ -489,6 +489,18 @@ impl Shred {
}
}
// Get slot from a shred packet with partial deserialize
pub fn get_slot_from_packet(p: &Packet) -> Option<Slot> {
let slot_start = OFFSET_OF_SHRED_SLOT;
let slot_end = slot_start + SIZE_OF_SHRED_SLOT;
if slot_end > p.meta.size {
return None;
}
limited_deserialize::<Slot>(&p.data[slot_start..slot_end]).ok()
}
pub fn reference_tick_from_data(data: &[u8]) -> u8 {
let flags = data[SIZE_OF_COMMON_SHRED_HEADER + SIZE_OF_DATA_SHRED_HEADER
- size_of::<u8>()

View File

@ -754,6 +754,15 @@ pub struct Bank {
/// The number of transactions processed without error
transaction_count: AtomicU64,
/// The number of transaction errors in this slot
transaction_error_count: AtomicU64,
/// The number of transaction entries in this slot
transaction_entries_count: AtomicU64,
/// The max number of transaction in an entry in this slot
transactions_per_entry_max: AtomicU64,
/// Bank tick height
tick_height: AtomicU64,
@ -1020,6 +1029,9 @@ impl Bank {
capitalization: AtomicU64::new(parent.capitalization()),
inflation: parent.inflation.clone(),
transaction_count: AtomicU64::new(parent.transaction_count()),
transaction_error_count: AtomicU64::new(0),
transaction_entries_count: AtomicU64::new(0),
transactions_per_entry_max: AtomicU64::new(0),
// we will .clone_with_epoch() this soon after stake data update; so just .clone() for now
stakes: RwLock::new(parent.stakes.read().unwrap().clone()),
epoch_stakes: parent.epoch_stakes.clone(),
@ -1158,6 +1170,9 @@ impl Bank {
parent_slot: fields.parent_slot,
hard_forks: Arc::new(RwLock::new(fields.hard_forks)),
transaction_count: AtomicU64::new(fields.transaction_count),
transaction_error_count: new(),
transaction_entries_count: new(),
transactions_per_entry_max: new(),
tick_height: AtomicU64::new(fields.tick_height),
signature_count: AtomicU64::new(fields.signature_count),
capitalization: AtomicU64::new(fields.capitalization),
@ -3149,6 +3164,16 @@ impl Bank {
inc_new_counter_info!("bank-process_transactions-txs", tx_count as usize);
inc_new_counter_info!("bank-process_transactions-sigs", signature_count as usize);
if !txs.is_empty() {
let processed_tx_count = txs.len() as u64;
let failed_tx_count = processed_tx_count.saturating_sub(tx_count);
self.transaction_error_count
.fetch_add(failed_tx_count, Relaxed);
self.transaction_entries_count.fetch_add(1, Relaxed);
self.transactions_per_entry_max
.fetch_max(processed_tx_count, Relaxed);
}
if executed
.iter()
.any(|(res, _nonce_rollback)| Self::can_commit(res))
@ -4088,6 +4113,18 @@ impl Bank {
self.transaction_count.load(Relaxed)
}
pub fn transaction_error_count(&self) -> u64 {
self.transaction_error_count.load(Relaxed)
}
pub fn transaction_entries_count(&self) -> u64 {
self.transaction_entries_count.load(Relaxed)
}
pub fn transactions_per_entry_max(&self) -> u64 {
self.transactions_per_entry_max.load(Relaxed)
}
fn increment_transaction_count(&self, tx_count: u64) {
self.transaction_count.fetch_add(tx_count, Relaxed);
}