Remove unnecessary arc and mutex for rpc notifications (#8351)

This commit is contained in:
Justin Starry 2020-02-21 08:03:46 +08:00 committed by GitHub
parent ab361a8073
commit 01697a9f5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 51 additions and 57 deletions

View File

@ -11,7 +11,6 @@ use solana_sdk::{
account::Account, clock::Slot, pubkey::Pubkey, signature::Signature, transaction,
};
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, SendError, Sender};
use std::thread::{Builder, JoinHandle};
@ -48,8 +47,6 @@ impl std::fmt::Debug for NotificationEntry {
}
}
type NotificationSend = Arc<Mutex<NotificationEntry>>;
type RpcAccountSubscriptions =
RwLock<HashMap<Pubkey, HashMap<SubscriptionId, (Sink<RpcAccount>, Confirmations)>>>;
type RpcProgramSubscriptions =
@ -204,7 +201,7 @@ pub struct RpcSubscriptions {
program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
slot_subscriptions: Arc<RpcSlotSubscriptions>,
notification_sender: Arc<Mutex<Sender<Arc<Mutex<NotificationEntry>>>>>,
notification_sender: Arc<Mutex<Sender<NotificationEntry>>>,
t_cleanup: Option<JoinHandle<()>>,
exit: Arc<AtomicBool>,
}
@ -226,8 +223,8 @@ impl Drop for RpcSubscriptions {
impl RpcSubscriptions {
pub fn new(exit: &Arc<AtomicBool>) -> Self {
let (notification_sender, notification_receiver): (
Sender<NotificationSend>,
Receiver<NotificationSend>,
Sender<NotificationEntry>,
Receiver<NotificationEntry>,
) = std::sync::mpsc::channel();
let account_subscriptions = Arc::new(RpcAccountSubscriptions::default());
@ -397,7 +394,7 @@ impl RpcSubscriptions {
.notification_sender
.lock()
.unwrap()
.send(Arc::new(Mutex::new(notification_entry)))
.send(notification_entry)
{
Ok(()) => (),
Err(SendError(notification)) => {
@ -411,7 +408,7 @@ impl RpcSubscriptions {
fn process_notifications(
exit: Arc<AtomicBool>,
notification_receiver: Receiver<Arc<Mutex<NotificationEntry>>>,
notification_receiver: Receiver<NotificationEntry>,
account_subscriptions: Arc<RpcAccountSubscriptions>,
program_subscriptions: Arc<RpcProgramSubscriptions>,
signature_subscriptions: Arc<RpcSignatureSubscriptions>,
@ -422,57 +419,54 @@ impl RpcSubscriptions {
break;
}
match notification_receiver.recv_timeout(Duration::from_millis(RECEIVE_DELAY_MILLIS)) {
Ok(notification_entry) => {
let mut notification_entry = notification_entry.lock().unwrap();
match notification_entry.deref_mut() {
NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(*slot_info)).wait().unwrap();
}
}
NotificationEntry::Bank((current_slot, bank_forks)) => {
let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
*current_slot,
&bank_forks,
account_subscriptions.clone(),
);
}
let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
*current_slot,
&bank_forks,
program_subscriptions.clone(),
);
}
let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
*current_slot,
&bank_forks,
signature_subscriptions.clone(),
);
}
Ok(notification_entry) => match notification_entry {
NotificationEntry::Slot(slot_info) => {
let subscriptions = slot_subscriptions.read().unwrap();
for (_, sink) in subscriptions.iter() {
sink.notify(Ok(slot_info)).wait().unwrap();
}
}
}
NotificationEntry::Bank((current_slot, bank_forks)) => {
let pubkeys: Vec<_> = {
let subs = account_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for pubkey in &pubkeys {
Self::check_account(
pubkey,
current_slot,
&bank_forks,
account_subscriptions.clone(),
);
}
let programs: Vec<_> = {
let subs = program_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for program_id in &programs {
Self::check_program(
program_id,
current_slot,
&bank_forks,
program_subscriptions.clone(),
);
}
let signatures: Vec<_> = {
let subs = signature_subscriptions.read().unwrap();
subs.keys().cloned().collect()
};
for signature in &signatures {
Self::check_signature(
signature,
current_slot,
&bank_forks,
signature_subscriptions.clone(),
);
}
}
},
Err(RecvTimeoutError::Timeout) => {
// not a problem - try reading again
}