Plumb slot update pubsub notifications (#15488)

This commit is contained in:
carllin 2021-02-28 23:29:11 -08:00 committed by GitHub
parent 33eaa2b238
commit ae96ba3459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 194 additions and 3 deletions

View File

@ -101,6 +101,15 @@ pub struct SlotInfo {
pub root: Slot,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum SlotUpdate {
OptimisticConfirmation { slot: Slot, timestamp: u64 },
FirstShredReceived { slot: Slot, timestamp: u64 },
Frozen { slot: Slot, timestamp: u64 },
Root { slot: Slot, timestamp: u64 },
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase", untagged)]
pub enum RpcSignatureResult {

View File

@ -94,6 +94,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
cluster_info,
packet_receiver,
&Arc::new(MaxSlots::default()),
None,
);
let mut index = 0;

View File

@ -4,8 +4,9 @@
use crate::rpc_subscriptions::RpcSubscriptions;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_client::rpc_response::SlotUpdate;
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::clock::Slot;
use solana_sdk::{clock::Slot, timing::timestamp};
use std::{
collections::HashSet,
sync::{
@ -130,6 +131,12 @@ impl OptimisticallyConfirmedBankTracker {
} else {
inc_new_counter_info!("dropped-already-rooted-optimistic-bank-notification", 1);
}
// Send slot notification regardless of whether the bank is replayed
subscriptions.notify_slot_update(SlotUpdate::OptimisticConfirmation {
slot,
timestamp: timestamp(),
});
}
BankNotification::Frozen(bank) => {
let frozen_slot = bank.slot();
@ -142,6 +149,10 @@ impl OptimisticallyConfirmedBankTracker {
}
drop(w_optimistically_confirmed_bank);
}
subscriptions.notify_slot_update(SlotUpdate::Frozen {
slot: frozen_slot,
timestamp: timestamp(),
});
}
BankNotification::Root(bank) => {
let root_slot = bank.slot();

View File

@ -12,10 +12,12 @@ use crate::{
repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo,
result::{Error, Result},
rpc_subscriptions::RpcSubscriptions,
window_service::{should_retransmit_and_persist, WindowService},
};
use crossbeam_channel::Receiver;
use lru::LruCache;
use solana_client::rpc_response::SlotUpdate;
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats};
use solana_ledger::{
blockstore::{Blockstore, CompletedSlotsReceiver},
@ -32,7 +34,7 @@ use solana_streamer::streamer::PacketReceiver;
use std::{
cmp,
collections::hash_set::HashSet,
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
net::UdpSocket,
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
@ -237,6 +239,43 @@ fn check_if_already_received(
}
}
fn notify_first_shred_received(
shred_slot: Slot,
rpc_subscriptions: &RpcSubscriptions,
sent_received_slot_notification: &Mutex<BTreeSet<Slot>>,
root_bank: &Bank,
) {
let notify_slot = {
let mut sent_received_slot_notification_locked =
sent_received_slot_notification.lock().unwrap();
if !sent_received_slot_notification_locked.contains(&shred_slot)
&& shred_slot > root_bank.slot()
{
sent_received_slot_notification_locked.insert(shred_slot);
if sent_received_slot_notification_locked.len() > 100 {
let mut slots_before_root =
sent_received_slot_notification_locked.split_off(&(root_bank.slot() + 1));
// `slots_before_root` now contains all slots <= root
std::mem::swap(
&mut slots_before_root,
&mut sent_received_slot_notification_locked,
);
}
Some(shred_slot)
} else {
None
}
};
if let Some(slot) = notify_slot {
info!("First time receiving a shred from slot: {}", slot);
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot,
timestamp: timestamp(),
});
}
}
// Returns true if turbine retransmit peers patch (#14565) is enabled.
fn enable_turbine_retransmit_peers_patch(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
@ -266,6 +305,8 @@ fn retransmit(
last_peer_update: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots,
sent_received_slot_notification: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: &Option<Arc<RpcSubscriptions>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap();
@ -341,6 +382,16 @@ fn retransmit(
None => continue,
};
max_slot = max_slot.max(shred_slot);
if let Some(rpc_subscriptions) = rpc_subscriptions {
notify_first_shred_received(
shred_slot,
rpc_subscriptions,
sent_received_slot_notification,
&root_bank,
);
}
let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
@ -439,12 +490,14 @@ pub fn retransmitter(
cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>,
max_slots: &Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> {
let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE),
PacketHasher::default(),
)));
let sent_received_slot_notification = Arc::new(Mutex::new(BTreeSet::new()));
(0..sockets.len())
.map(|s| {
let sockets = sockets.clone();
@ -457,6 +510,8 @@ pub fn retransmitter(
let last_peer_update = Arc::new(AtomicU64::new(0));
let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone();
let sent_received_slot_notification = sent_received_slot_notification.clone();
let rpc_subscriptions = rpc_subscriptions.clone();
Builder::new()
.name("solana-retransmitter".to_string())
@ -475,6 +530,8 @@ pub fn retransmitter(
&last_peer_update,
&shreds_received,
&max_slots,
&sent_received_slot_notification,
&rpc_subscriptions,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -520,6 +577,7 @@ impl RetransmitStage {
repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
max_slots: &Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -531,6 +589,7 @@ impl RetransmitStage {
cluster_info.clone(),
retransmit_receiver,
max_slots,
rpc_subscriptions,
);
let leader_schedule_cache_clone = leader_schedule_cache.clone();
@ -649,6 +708,7 @@ mod tests {
cluster_info,
Arc::new(Mutex::new(retransmit_receiver)),
&Arc::new(MaxSlots::default()),
None,
);
let _thread_hdls = vec![t_retransmit];

View File

@ -12,6 +12,7 @@ use solana_client::{
},
rpc_response::{
Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, SlotInfo,
SlotUpdate,
},
};
#[cfg(test)]
@ -139,6 +140,30 @@ pub trait RpcSolPubSub {
)]
fn slot_unsubscribe(&self, meta: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool>;
// Get series of updates for all slots
#[pubsub(
subscription = "slotsUpdatesNotification",
subscribe,
name = "slotsUpdatesSubscribe"
)]
fn slots_updates_subscribe(
&self,
meta: Self::Metadata,
subscriber: Subscriber<Arc<SlotUpdate>>,
);
// Unsubscribe from slots updates notification subscription.
#[pubsub(
subscription = "slotsUpdatesNotification",
unsubscribe,
name = "slotsUpdatesUnsubscribe"
)]
fn slots_updates_unsubscribe(
&self,
meta: Option<Self::Metadata>,
id: SubscriptionId,
) -> Result<bool>;
// Get notification when vote is encountered
#[pubsub(subscription = "voteNotification", subscribe, name = "voteSubscribe")]
fn vote_subscribe(&self, meta: Self::Metadata, subscriber: Subscriber<RpcVote>);
@ -428,6 +453,40 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
}
}
fn slots_updates_subscribe(
&self,
_meta: Self::Metadata,
subscriber: Subscriber<Arc<SlotUpdate>>,
) {
info!("slots_updates_subscribe");
if let Err(err) = self.check_subscription_count() {
subscriber.reject(err).unwrap_or_default();
return;
}
let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed);
let sub_id = SubscriptionId::Number(id as u64);
info!("slots_updates_subscribe: id={:?}", sub_id);
self.subscriptions
.add_slots_updates_subscription(sub_id, subscriber);
}
fn slots_updates_unsubscribe(
&self,
_meta: Option<Self::Metadata>,
id: SubscriptionId,
) -> Result<bool> {
info!("slots_updates_unsubscribe");
if self.subscriptions.remove_slots_updates_subscription(&id) {
Ok(true)
} else {
Err(Error {
code: ErrorCode::InvalidParams,
message: "Invalid Request: Subscription id does not exist".into(),
data: None,
})
}
}
fn vote_subscribe(&self, _meta: Self::Metadata, subscriber: Subscriber<RpcVote>) {
info!("vote_subscribe");
if let Err(err) = self.check_subscription_count() {

View File

@ -16,7 +16,7 @@ use solana_client::{
rpc_filter::RpcFilterType,
rpc_response::{
ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount,
RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo,
RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, SlotUpdate,
},
};
use solana_measure::measure::Measure;
@ -33,6 +33,7 @@ use solana_sdk::{
commitment_config::CommitmentConfig,
pubkey::Pubkey,
signature::Signature,
timing::timestamp,
transaction,
};
use solana_vote_program::vote_state::Vote;
@ -82,6 +83,7 @@ pub struct RpcVote {
enum NotificationEntry {
Slot(SlotInfo),
SlotUpdate(SlotUpdate),
Vote(Vote),
Root(Slot),
Bank(CommitmentSlots),
@ -95,6 +97,9 @@ impl std::fmt::Debug for NotificationEntry {
NotificationEntry::Root(root) => write!(f, "Root({})", root),
NotificationEntry::Vote(vote) => write!(f, "Vote({:?})", vote),
NotificationEntry::Slot(slot_info) => write!(f, "Slot({:?})", slot_info),
NotificationEntry::SlotUpdate(slot_update) => {
write!(f, "SlotUpdate({:?})", slot_update)
}
NotificationEntry::Bank(commitment_slots) => {
write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
}
@ -142,6 +147,7 @@ type RpcSignatureSubscriptions = RwLock<
>,
>;
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
type RpcSlotUpdateSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Arc<SlotUpdate>>>>;
type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>;
type RpcRootSubscriptions = RwLock<HashMap<SubscriptionId, Sink<Slot>>>;
@ -387,6 +393,7 @@ struct Subscriptions {
gossip_program_subscriptions: Arc<RpcProgramSubscriptions>,
gossip_signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>,
slots_updates_subscriptions: Arc<RpcSlotUpdateSubscriptions>,
vote_subscriptions: Arc<RpcVoteSubscriptions>,
root_subscriptions: Arc<RpcRootSubscriptions>,
}
@ -465,6 +472,7 @@ impl RpcSubscriptions {
let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default());
let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default());
let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default());
let slots_updates_subscriptions = Arc::new(RpcSlotUpdateSubscriptions::default());
let vote_subscriptions = Arc::new(RpcVoteSubscriptions::default());
let root_subscriptions = Arc::new(RpcRootSubscriptions::default());
let notification_sender = Arc::new(Mutex::new(notification_sender));
@ -482,6 +490,7 @@ impl RpcSubscriptions {
gossip_program_subscriptions,
gossip_signature_subscriptions,
slot_subscriptions,
slots_updates_subscriptions,
vote_subscriptions,
root_subscriptions,
};
@ -903,6 +912,10 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Gossip(slot));
}
pub fn notify_slot_update(&self, slot_update: SlotUpdate) {
self.enqueue_notification(NotificationEntry::SlotUpdate(slot_update));
}
pub fn add_slot_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<SlotInfo>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.subscriptions.slot_subscriptions.write().unwrap();
@ -914,6 +927,29 @@ impl RpcSubscriptions {
subscriptions.remove(id).is_some()
}
pub fn add_slots_updates_subscription(
&self,
sub_id: SubscriptionId,
subscriber: Subscriber<Arc<SlotUpdate>>,
) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self
.subscriptions
.slots_updates_subscriptions
.write()
.unwrap();
subscriptions.insert(sub_id, sink);
}
pub fn remove_slots_updates_subscription(&self, id: &SubscriptionId) -> bool {
let mut subscriptions = self
.subscriptions
.slots_updates_subscriptions
.write()
.unwrap();
subscriptions.remove(id).is_some()
}
pub fn notify_slot(&self, slot: Slot, parent: Slot, root: Slot) {
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
}
@ -957,6 +993,10 @@ impl RpcSubscriptions {
pub fn notify_roots(&self, mut rooted_slots: Vec<Slot>) {
rooted_slots.sort_unstable();
rooted_slots.into_iter().for_each(|root| {
self.enqueue_notification(NotificationEntry::SlotUpdate(SlotUpdate::Root {
slot: root,
timestamp: timestamp(),
}));
self.enqueue_notification(NotificationEntry::Root(root));
});
}
@ -1005,6 +1045,15 @@ impl RpcSubscriptions {
notifier.notify(slot_info, sink);
}
}
NotificationEntry::SlotUpdate(slot_update) => {
let subscriptions =
subscriptions.slots_updates_subscriptions.read().unwrap();
let slot_update = Arc::new(slot_update);
for (_, sink) in subscriptions.iter() {
inc_new_counter_info!("rpc-subscription-notify-slots-updates", 1);
notifier.notify(slot_update.clone(), sink);
}
}
// These notifications are only triggered by votes observed on gossip,
// unlike `NotificationEntry::Gossip`, which also accounts for slots seen
// in VoteState's from bank states built in ReplayStage.
@ -1021,6 +1070,7 @@ impl RpcSubscriptions {
inc_new_counter_info!("rpc-subscription-notify-vote", 1);
notifier.notify(
RpcVote {
// TODO: Remove clones
slots: vote_info.slots.clone(),
hash: bs58::encode(vote_info.hash).into_string(),
timestamp: vote_info.timestamp,

View File

@ -178,6 +178,7 @@ impl Tvu {
tvu_config.repair_validators,
completed_data_sets_sender,
max_slots,
Some(subscriptions.clone()),
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();