diff --git a/accounts-bench/Cargo.toml b/accounts-bench/Cargo.toml index 4cf0408acf..9981bdd6fa 100644 --- a/accounts-bench/Cargo.toml +++ b/accounts-bench/Cargo.toml @@ -10,10 +10,10 @@ homepage = "https://solana.com/" [dependencies] log = "0.4.6" rayon = "1.4.0" -solana-logger = { path = "../logger", version = "1.4.0" } -solana-runtime = { path = "../runtime", version = "1.4.0" } -solana-measure = { path = "../measure", version = "1.4.0" } -solana-sdk = { path = "../sdk", version = "1.4.0" } +solana-logger = { path = "../logger", version = "1.3.6" } +solana-runtime = { path = "../runtime", version = "1.3.6" } +solana-measure = { path = "../measure", version = "1.3.6" } +solana-sdk = { path = "../sdk", version = "1.3.6" } rand = "0.7.0" clap = "2.33.1" crossbeam-channel = "0.4" diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index db22332884..116c145bbe 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -5,6 +5,9 @@ use serde_json::{ value::Value::{Number, Object}, Map, Value, }; +use solana_sdk::{ + commitment_config::CommitmentConfig, signature::Signature, transaction::TransactionError, +}; use std::{ marker::PhantomData, sync::{ @@ -18,6 +21,8 @@ use thiserror::Error; use tungstenite::{client::AutoStream, connect, Message, WebSocket}; use url::{ParseError, Url}; +type PubsubSignatureResponse = PubsubClientSubscription>; + #[derive(Debug, Error)] pub enum PubsubClientError { #[error("url parse error")] @@ -33,6 +38,30 @@ pub enum PubsubClientError { UnexpectedMessageError, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(rename_all = "camelCase", tag = "type", content = "result")] +pub enum SignatureResult { + ProcessedSignatureResult(ProcessedSignatureResult), + ReceivedSignature, +} + +#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] +pub struct RpcResponseContext { + pub slot: u64, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ProcessedSignatureResult { + pub err: Option, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RpcResponse { + pub context: RpcResponseContext, + pub value: T, +} + #[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] pub struct SlotInfoMessage { pub parent: u64, @@ -40,6 +69,14 @@ pub struct SlotInfoMessage { pub slot: u64, } +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcSignatureSubscribeConfig { + #[serde(flatten)] + pub commitment: Option, + pub enable_received_notification: Option, +} + pub struct PubsubClientSubscription where T: DeserializeOwned, @@ -73,18 +110,12 @@ where { fn send_subscribe( writable_socket: &Arc>>, - operation: &str, + body: String, ) -> Result { - let method = format!("{}Subscribe", operation); writable_socket .write() .unwrap() - .write_message(Message::Text( - json!({ - "jsonrpc":"2.0","id":1,"method":method,"params":[] - }) - .to_string(), - ))?; + .write_message(Message::Text(body))?; let message = writable_socket.write().unwrap().read_message()?; Self::extract_subscription_id(message) } @@ -148,6 +179,7 @@ where } const SLOT_OPERATION: &str = "slot"; +const SIGNATURE_OPERATION: &str = "signature"; pub struct PubsubClient {} @@ -171,7 +203,10 @@ impl PubsubClient { let exit_clone = exit.clone(); let subscription_id = PubsubClientSubscription::::send_subscribe( &socket_clone, - SLOT_OPERATION, + json!({ + "jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[] + }) + .to_string(), ) .unwrap(); @@ -212,6 +247,80 @@ impl PubsubClient { Ok((result, receiver)) } + + pub fn signature_subscribe( + url: &str, + signature: &Signature, + ) -> Result< + ( + PubsubSignatureResponse, + Receiver>, + ), + PubsubClientError, + > { + let url = Url::parse(url)?; + let (socket, _response) = connect(url)?; + let (sender, receiver) = channel::>(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + let body = json!({ + "jsonrpc":"2.0", + "id":1, + "method":format!("{}Subscribe", SIGNATURE_OPERATION), + "params":[ + signature.to_string(), + {"enableReceivedNotification": true } + ] + }) + .to_string(); + let subscription_id = + PubsubClientSubscription::>::send_subscribe( + &socket_clone, + body, + ) + .unwrap(); + + let t_cleanup = std::thread::spawn(move || { + loop { + if exit_clone.load(Ordering::Relaxed) { + break; + } + + let message: Result, PubsubClientError> = + PubsubClientSubscription::read_message(&socket_clone); + + if let Ok(msg) = message { + match sender.send(msg.clone()) { + Ok(_) => (), + Err(err) => { + info!("receive error: {:?}", err); + break; + } + } + } else { + info!("receive error: {:?}", message); + break; + } + } + + info!("websocket - exited receive loop"); + }); + + let result: PubsubClientSubscription> = + PubsubClientSubscription { + message_type: PhantomData, + operation: SIGNATURE_OPERATION, + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } } #[cfg(test)] diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index ed24156569..a335394237 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -69,6 +69,14 @@ pub enum RpcTokenAccountsFilter { ProgramId(String), } +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcSignatureSubscribeConfig { + #[serde(flatten)] + pub commitment: Option, + pub enable_received_notification: Option, +} + #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RpcGetConfirmedSignaturesForAddress2Config { diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index 2d70853cda..afd0e1d7f2 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -94,9 +94,16 @@ pub struct RpcKeyedAccount { pub account: UiAccount, } +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase", tag = "type", content = "result")] +pub enum RpcSignatureResult { + ProcessedSignatureResult(ProcessedSignatureResult), + ReceivedSignature, +} + #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase")] -pub struct RpcSignatureResult { +pub struct ProcessedSignatureResult { pub err: Option, } diff --git a/core/src/completed_data_sets_service.rs b/core/src/completed_data_sets_service.rs new file mode 100644 index 0000000000..f12d5ecd8a --- /dev/null +++ b/core/src/completed_data_sets_service.rs @@ -0,0 +1,82 @@ +use crate::rpc_subscriptions::RpcSubscriptions; +use crossbeam_channel::{Receiver, RecvTimeoutError, Sender}; +use solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo}; +use solana_sdk::signature::Signature; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub type CompletedDataSetsReceiver = Receiver>; +pub type CompletedDataSetsSender = Sender>; + +pub struct CompletedDataSetsService { + thread_hdl: JoinHandle<()>, +} + +impl CompletedDataSetsService { + pub fn new( + completed_sets_receiver: CompletedDataSetsReceiver, + blockstore: Arc, + rpc_subscriptions: Arc, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("completed-data-set-service".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(RecvTimeoutError::Disconnected) = Self::recv_completed_data_sets( + &completed_sets_receiver, + &blockstore, + &rpc_subscriptions, + ) { + break; + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn recv_completed_data_sets( + completed_sets_receiver: &CompletedDataSetsReceiver, + blockstore: &Blockstore, + rpc_subscriptions: &RpcSubscriptions, + ) -> Result<(), RecvTimeoutError> { + let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?; + for completed_set_info in std::iter::once(completed_data_sets) + .chain(completed_sets_receiver.try_iter()) + .flatten() + { + let CompletedDataSetInfo { + slot, + start_index, + end_index, + } = completed_set_info; + match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) { + Ok(entries) => { + let transactions = entries + .into_iter() + .flat_map(|e| e.transactions.into_iter().map(|t| t.signatures[0])) + .collect::>(); + if !transactions.is_empty() { + rpc_subscriptions.notify_signatures_received((slot, transactions)); + } + } + Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e), + } + } + + Ok(()) + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 9649c622d4..268db59c4c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -12,6 +12,7 @@ pub mod banking_stage; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment_service; +pub mod completed_data_sets_service; mod deprecated; pub mod shred_fetch_stage; #[macro_use] diff --git a/core/src/result.rs b/core/src/result.rs index 3d7e8de282..6c53662936 100644 --- a/core/src/result.rs +++ b/core/src/result.rs @@ -20,6 +20,7 @@ pub enum Error { ReadyTimeoutError, RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), CrossbeamSendError, + TryCrossbeamSendError, TryRecvError(std::sync::mpsc::TryRecvError), Serialize(std::boxed::Box), TransactionError(transaction::TransactionError), @@ -87,6 +88,11 @@ impl std::convert::From> for Error { Error::CrossbeamSendError } } +impl std::convert::From> for Error { + fn from(_e: crossbeam_channel::TrySendError) -> Error { + Error::TryCrossbeamSendError + } +} impl std::convert::From> for Error { fn from(_e: std::sync::mpsc::SendError) -> Error { Error::SendError diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 478db61832..3ef9575c1e 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -5,6 +5,7 @@ use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, cluster_slots_service::ClusterSlotsService, + completed_data_sets_service::CompletedDataSetsSender, contact_info::ContactInfo, repair_service::DuplicateSlotsResetSender, repair_service::RepairInfo, @@ -419,6 +420,7 @@ impl RetransmitStage { duplicate_slots_reset_sender: DuplicateSlotsResetSender, verified_vote_receiver: VerifiedVoteReceiver, repair_validators: Option>, + completed_data_sets_sender: CompletedDataSetsSender, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -472,6 +474,7 @@ impl RetransmitStage { }, cluster_slots, verified_vote_receiver, + completed_data_sets_sender, ); let thread_hdls = t_retransmit; diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index a0c003011b..c98265b568 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -6,14 +6,12 @@ use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_account_decoder::UiAccount; use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult}, }; #[cfg(test)] use solana_runtime::bank_forks::BankForks; -use solana_sdk::{ - clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature, -}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; #[cfg(test)] use std::sync::RwLock; use std::{ @@ -89,7 +87,7 @@ pub trait RpcSolPubSub { meta: Self::Metadata, subscriber: Subscriber>, signature_str: String, - commitment: Option, + config: Option, ); // Unsubscribe from signature notification subscription. @@ -248,7 +246,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl { _meta: Self::Metadata, subscriber: Subscriber>, signature_str: String, - commitment: Option, + signature_subscribe_config: Option, ) { info!("signature_subscribe"); match param::(&signature_str, "signature") { @@ -259,8 +257,12 @@ impl RpcSolPubSub for RpcSolPubSubImpl { "signature_subscribe: signature={:?} id={:?}", signature, sub_id ); - self.subscriptions - .add_signature_subscription(signature, commitment, sub_id, subscriber); + self.subscriptions.add_signature_subscription( + signature, + signature_subscribe_config, + sub_id, + subscriber, + ); } Err(e) => subscriber.reject(e).unwrap(), } @@ -359,6 +361,7 @@ mod tests { use jsonrpc_pubsub::{PubSubHandler, Session}; use serial_test_derive::serial; use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding}; + use solana_client::rpc_response::ProcessedSignatureResult; use solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -369,6 +372,7 @@ mod tests { }, }; use solana_sdk::{ + commitment_config::CommitmentConfig, hash::Hash, message::Message, pubkey::Pubkey, @@ -442,7 +446,8 @@ mod tests { // Test signature confirmation notification let (response, _) = robust_poll_or_panic(receiver); - let expected_res = RpcSignatureResult { err: None }; + let expected_res = + RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None }); let expected = json!({ "jsonrpc": "2.0", "method": "signatureNotification", @@ -454,6 +459,38 @@ mod tests { "subscription": 0, } }); + + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + // Test "received" + let session = create_session(); + let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification"); + rpc.signature_subscribe( + session, + subscriber, + tx.signatures[0].to_string(), + Some(RpcSignatureSubscribeConfig { + commitment: None, + enable_received_notification: Some(true), + }), + ); + let received_slot = 1; + rpc.subscriptions + .notify_signatures_received((received_slot, vec![tx.signatures[0]])); + // Test signature confirmation notification + let (response, _) = robust_poll_or_panic(receiver); + let expected_res = RpcSignatureResult::ReceivedSignature; + let expected = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": received_slot }, + "value": expected_res, + }, + "subscription": 1, + } + }); assert_eq!(serde_json::to_string(&expected).unwrap(), response); } diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index 7a230614fc..5d2a916be1 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -10,9 +10,11 @@ use jsonrpc_pubsub::{ use serde::Serialize; use solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding}; use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, rpc_filter::RpcFilterType, - rpc_response::{Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult}, + rpc_response::{ + ProcessedSignatureResult, Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult, + }, }; use solana_runtime::{ bank::Bank, @@ -67,6 +69,7 @@ enum NotificationEntry { Frozen(Slot), Bank(CommitmentSlots), Gossip(Slot), + SignaturesReceived((Slot, Vec)), } impl std::fmt::Debug for NotificationEntry { @@ -79,6 +82,9 @@ impl std::fmt::Debug for NotificationEntry { NotificationEntry::Bank(commitment_slots) => { write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot) } + NotificationEntry::SignaturesReceived(slot_signatures) => { + write!(f, "SignaturesReceived({:?})", slot_signatures) + } NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot), } } @@ -108,7 +114,10 @@ type RpcProgramSubscriptions = RwLock< >, >; type RpcSignatureSubscriptions = RwLock< - HashMap, ()>>>, + HashMap< + Signature, + HashMap, bool>>, + >, >; type RpcSlotSubscriptions = RwLock>>; type RpcVoteSubscriptions = RwLock>>; @@ -134,13 +143,11 @@ fn add_subscription( last_notified_slot: RwLock::new(last_notified_slot), config, }; - if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) { - current_hashmap.insert(sub_id, subscription_data); - return; - } - let mut hashmap = HashMap::new(); - hashmap.insert(sub_id, subscription_data); - subscriptions.insert(hashmap_key, hashmap); + + subscriptions + .entry(hashmap_key) + .or_default() + .insert(sub_id, subscription_data); } fn remove_subscription( @@ -279,15 +286,15 @@ fn filter_signature_result( result: Option>, _signature: &Signature, last_notified_slot: Slot, - _config: Option<()>, + _config: Option, _bank: Option>, ) -> (Box>, Slot) { ( - Box::new( - result - .into_iter() - .map(|result| RpcSignatureResult { err: result.err() }), - ), + Box::new(result.into_iter().map(|result| { + RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { + err: result.err(), + }) + })), last_notified_slot, ) } @@ -629,13 +636,18 @@ impl RpcSubscriptions { pub fn add_signature_subscription( &self, signature: Signature, - commitment: Option, + signature_subscribe_config: Option, sub_id: SubscriptionId, subscriber: Subscriber>, ) { + let (commitment, enable_received_notification) = signature_subscribe_config + .map(|config| (config.commitment, config.enable_received_notification)) + .unwrap_or((None, Some(false))); + let commitment_level = commitment .unwrap_or_else(CommitmentConfig::recent) .commitment; + let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { self.subscriptions .gossip_signature_subscriptions @@ -651,7 +663,7 @@ impl RpcSubscriptions { sub_id, subscriber, 0, // last_notified_slot is not utilized for signature subscriptions - None, + enable_received_notification, ); } @@ -696,6 +708,10 @@ impl RpcSubscriptions { self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); } + pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec)) { + self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures)); + } + pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber) { let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); @@ -840,6 +856,13 @@ impl RpcSubscriptions { ); } } + NotificationEntry::SignaturesReceived(slot_signatures) => { + RpcSubscriptions::process_signatures_received( + &slot_signatures, + &subscriptions.signature_subscriptions, + ¬ifier, + ) + } }, Err(RecvTimeoutError::Timeout) => { // not a problem - try reading again @@ -939,6 +962,40 @@ impl RpcSubscriptions { } } + fn process_signatures_received( + (received_slot, signatures): &(Slot, Vec), + signature_subscriptions: &Arc, + notifier: &RpcNotifier, + ) { + for signature in signatures { + if let Some(hashmap) = signature_subscriptions.read().unwrap().get(signature) { + for ( + _, + SubscriptionData { + sink, + config: is_received_notification_enabled, + .. + }, + ) in hashmap.iter() + { + if is_received_notification_enabled + .expect("All signature subscriptions must have this config field set") + { + notifier.notify( + Response { + context: RpcResponseContext { + slot: *received_slot, + }, + value: RpcSignatureResult::ReceivedSignature, + }, + &sink, + ); + } + } + } + } + } + fn shutdown(&mut self) -> std::thread::Result<()> { if let Some(runtime) = self.notifier_runtime.take() { info!("RPC Notifier runtime - shutting down"); @@ -1245,31 +1302,55 @@ pub(crate) mod tests { Subscriber::new_test("signatureNotification"); let (processed_sub, _id_receiver, processed_recv) = Subscriber::new_test("signatureNotification"); + let (processed_sub3, _id_receiver, processed_recv3) = + Subscriber::new_test("signatureNotification"); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], - Some(CommitmentConfig::recent()), + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::recent()), + enable_received_notification: Some(false), + }), SubscriptionId::Number(1 as u64), past_bank_sub1, ); subscriptions.add_signature_subscription( past_bank_tx.signatures[0], - Some(CommitmentConfig::root()), + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::root()), + enable_received_notification: Some(false), + }), SubscriptionId::Number(2 as u64), past_bank_sub2, ); subscriptions.add_signature_subscription( processed_tx.signatures[0], - Some(CommitmentConfig::recent()), + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::recent()), + enable_received_notification: Some(false), + }), SubscriptionId::Number(3 as u64), processed_sub, ); subscriptions.add_signature_subscription( unprocessed_tx.signatures[0], - Some(CommitmentConfig::recent()), + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::recent()), + enable_received_notification: Some(false), + }), SubscriptionId::Number(4 as u64), Subscriber::new_test("signatureNotification").0, ); + // Add a subscription that gets `received` notifications + subscriptions.add_signature_subscription( + unprocessed_tx.signatures[0], + Some(RpcSignatureSubscribeConfig { + commitment: Some(CommitmentConfig::recent()), + enable_received_notification: Some(true), + }), + SubscriptionId::Number(5 as u64), + processed_sub3, + ); { let sig_subs = subscriptions @@ -1282,46 +1363,62 @@ pub(crate) mod tests { assert!(sig_subs.contains_key(&processed_tx.signatures[0])); } let mut commitment_slots = CommitmentSlots::default(); - commitment_slots.slot = 1; + let received_slot = 1; + commitment_slots.slot = received_slot; + subscriptions + .notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]])); subscriptions.notify_subscribers(commitment_slots); - let expected_res = RpcSignatureResult { err: None }; - + let expected_res = + RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None }); + let received_expected_res = RpcSignatureResult::ReceivedSignature; struct Notification { slot: Slot, id: u64, } - let expected_notification = |exp: Notification| -> String { - let json = json!({ - "jsonrpc": "2.0", - "method": "signatureNotification", - "params": { - "result": { - "context": { "slot": exp.slot }, - "value": &expected_res, - }, - "subscription": exp.id, - } - }); - serde_json::to_string(&json).unwrap() - }; + let expected_notification = + |exp: Notification, expected_res: &RpcSignatureResult| -> String { + let json = json!({ + "jsonrpc": "2.0", + "method": "signatureNotification", + "params": { + "result": { + "context": { "slot": exp.slot }, + "value": expected_res, + }, + "subscription": exp.id, + } + }); + serde_json::to_string(&json).unwrap() + }; // Expect to receive a notification from bank 1 because this subscription is // looking for 0 confirmations and so checks the current bank - let expected = expected_notification(Notification { slot: 1, id: 1 }); + let expected = expected_notification(Notification { slot: 1, id: 1 }, &expected_res); let (response, _) = robust_poll_or_panic(past_bank_recv1); assert_eq!(expected, response); // Expect to receive a notification from bank 0 because this subscription is // looking for 1 confirmation and so checks the past bank - let expected = expected_notification(Notification { slot: 0, id: 2 }); + let expected = expected_notification(Notification { slot: 0, id: 2 }, &expected_res); let (response, _) = robust_poll_or_panic(past_bank_recv2); assert_eq!(expected, response); - let expected = expected_notification(Notification { slot: 1, id: 3 }); + let expected = expected_notification(Notification { slot: 1, id: 3 }, &expected_res); let (response, _) = robust_poll_or_panic(processed_recv); assert_eq!(expected, response); + // Expect a "received" notification + let expected = expected_notification( + Notification { + slot: received_slot, + id: 5, + }, + &received_expected_res, + ); + let (response, _) = robust_poll_or_panic(processed_recv3); + assert_eq!(expected, response); + // Subscription should be automatically removed after notification let sig_subs = subscriptions .subscriptions @@ -1334,7 +1431,7 @@ pub(crate) mod tests { // Unprocessed signature subscription should not be removed assert_eq!( sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), - 1 + 2 ); } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 5764da83a1..95d35ccc3c 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -8,6 +8,7 @@ use crate::{ cluster_info::ClusterInfo, cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, cluster_slots::ClusterSlots, + completed_data_sets_service::CompletedDataSetsSender, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, @@ -100,6 +101,7 @@ impl Tvu { retransmit_slots_sender: RetransmitSlotsSender, verified_vote_receiver: VerifiedVoteReceiver, replay_vote_sender: ReplayVoteSender, + completed_data_sets_sender: CompletedDataSetsSender, tvu_config: TvuConfig, ) -> Self { let keypair: Arc = cluster_info.keypair.clone(); @@ -152,6 +154,7 @@ impl Tvu { duplicate_slots_reset_sender, verified_vote_receiver, tvu_config.repair_validators, + completed_data_sets_sender, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -249,6 +252,7 @@ pub mod tests { }; use serial_test_derive::serial; use solana_ledger::{ + blockstore::BlockstoreSignals, create_new_tmp_ledger, genesis_utils::{create_genesis_config, GenesisConfigInfo}, }; @@ -275,9 +279,13 @@ pub mod tests { let cref1 = Arc::new(cluster_info1); let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config); - let (blockstore, l_receiver, completed_slots_receiver) = - Blockstore::open_with_signal(&blockstore_path, None) - .expect("Expected to successfully open ledger"); + let BlockstoreSignals { + blockstore, + ledger_signal_receiver, + completed_slots_receiver, + .. + } = Blockstore::open_with_signal(&blockstore_path, None) + .expect("Expected to successfully open ledger"); let blockstore = Arc::new(blockstore); let bank = bank_forks.working_bank(); let (exit, poh_recorder, poh_service, _entry_receiver) = @@ -288,6 +296,7 @@ pub mod tests { let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); + let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tvu = Tvu::new( &vote_keypair.pubkey(), @@ -303,7 +312,7 @@ pub mod tests { } }, blockstore, - l_receiver, + ledger_signal_receiver, &Arc::new(RpcSubscriptions::new( &exit, bank_forks.clone(), @@ -322,6 +331,7 @@ pub mod tests { retransmit_slots_sender, verified_vote_receiver, replay_vote_sender, + completed_data_sets_sender, TvuConfig::default(), ); exit.store(true, Ordering::Relaxed); diff --git a/core/src/validator.rs b/core/src/validator.rs index 023fa6254e..fdf35de172 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -4,6 +4,7 @@ use crate::{ broadcast_stage::BroadcastStageType, cluster_info::{ClusterInfo, Node}, cluster_info_vote_listener::VoteTracker, + completed_data_sets_service::CompletedDataSetsService, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, @@ -21,12 +22,12 @@ use crate::{ transaction_status_service::TransactionStatusService, tvu::{Sockets, Tvu, TvuConfig}, }; -use crossbeam_channel::unbounded; +use crossbeam_channel::{bounded, unbounded}; use rand::{thread_rng, Rng}; use solana_banks_server::rpc_banks_service::RpcBanksService; use solana_ledger::{ bank_forks_utils, - blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, + blockstore::{Blockstore, BlockstoreSignals, CompletedSlotsReceiver, PurgeType}, blockstore_db::BlockstoreRecoveryMode, blockstore_processor::{self, TransactionStatusSender}, create_new_tmp_ledger, @@ -64,6 +65,8 @@ use std::{ time::Duration, }; +pub const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000; + #[derive(Clone, Debug)] pub struct ValidatorConfig { pub dev_halt_at_slot: Option, @@ -156,6 +159,7 @@ pub struct Validator { rewards_recorder_service: Option, gossip_service: GossipService, serve_repair_service: ServeRepairService, + completed_data_sets_service: CompletedDataSetsService, snapshot_packager_service: Option, poh_recorder: Arc>, poh_service: PohService, @@ -284,6 +288,15 @@ impl Validator { block_commitment_cache.clone(), )); + let (completed_data_sets_sender, completed_data_sets_receiver) = + bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL); + let completed_data_sets_service = CompletedDataSetsService::new( + completed_data_sets_receiver, + blockstore.clone(), + subscriptions.clone(), + &exit, + ); + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let rpc_service = config .rpc_ports @@ -468,6 +481,7 @@ impl Validator { retransmit_slots_sender, verified_vote_receiver, replay_vote_sender.clone(), + completed_data_sets_sender, TvuConfig { max_ledger_shreds: config.max_ledger_shreds, halt_on_trusted_validators_accounts_hash_mismatch: config @@ -509,6 +523,7 @@ impl Validator { transaction_status_service, rewards_recorder_service, snapshot_packager_service, + completed_data_sets_service, tpu, tvu, poh_service, @@ -579,6 +594,7 @@ impl Validator { self.serve_repair_service.join()?; self.tpu.join()?; self.tvu.join()?; + self.completed_data_sets_service.join()?; self.ip_echo_server.shutdown_now(); Ok(()) @@ -622,9 +638,13 @@ fn new_banks_from_ledger( } } - let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) = - Blockstore::open_with_signal(ledger_path, config.wal_recovery_mode.clone()) - .expect("Failed to open ledger database"); + let BlockstoreSignals { + mut blockstore, + ledger_signal_receiver, + completed_slots_receiver, + .. + } = Blockstore::open_with_signal(ledger_path, config.wal_recovery_mode.clone()) + .expect("Failed to open ledger database"); blockstore.set_no_compaction(config.no_rocksdb_compaction); let process_options = blockstore_processor::ProcessOptions { diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 090de53390..ff72bbbef6 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -5,6 +5,7 @@ use crate::{ cluster_info::ClusterInfo, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, + completed_data_sets_service::CompletedDataSetsSender, repair_response, repair_service::{RepairInfo, RepairService}, result::{Error, Result}, @@ -123,6 +124,7 @@ fn run_insert( leader_schedule_cache: &Arc, handle_duplicate: F, metrics: &mut BlockstoreInsertionMetrics, + completed_data_sets_sender: &CompletedDataSetsSender, ) -> Result<()> where F: Fn(Shred), @@ -138,13 +140,13 @@ where let mut i = 0; shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0); - blockstore.insert_shreds_handle_duplicate( + completed_data_sets_sender.try_send(blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), false, &handle_duplicate, metrics, - )?; + )?)?; Ok(()) } @@ -302,6 +304,7 @@ impl WindowService { shred_filter: F, cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, + completed_data_sets_sender: CompletedDataSetsSender, ) -> WindowService where F: 'static @@ -333,6 +336,7 @@ impl WindowService { leader_schedule_cache, insert_receiver, duplicate_sender, + completed_data_sets_sender, ); let t_window = Self::start_recv_window_thread( @@ -387,6 +391,7 @@ impl WindowService { leader_schedule_cache: &Arc, insert_receiver: CrossbeamReceiver<(Vec, Vec>)>, duplicate_sender: CrossbeamSender, + completed_data_sets_sender: CompletedDataSetsSender, ) -> JoinHandle<()> { let exit = exit.clone(); let blockstore = blockstore.clone(); @@ -415,6 +420,7 @@ impl WindowService { &leader_schedule_cache, &handle_duplicate, &mut metrics, + &completed_data_sets_sender, ) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { break; diff --git a/core/tests/rpc.rs b/core/tests/rpc.rs index 9bd5d44d21..8c52ef5de9 100644 --- a/core/tests/rpc.rs +++ b/core/tests/rpc.rs @@ -285,8 +285,12 @@ fn test_rpc_subscriptions() { let timeout = deadline.saturating_duration_since(Instant::now()); match status_receiver.recv_timeout(timeout) { Ok((sig, result)) => { - assert!(result.value.err.is_none()); - assert!(signature_set.remove(&sig)); + if let RpcSignatureResult::ProcessedSignatureResult(result) = result.value { + assert!(result.err.is_none()); + assert!(signature_set.remove(&sig)); + } else { + panic!("Unexpected result"); + } } Err(_err) => { assert!( diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3b46305129..e4c87516de 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -108,6 +108,19 @@ impl std::fmt::Display for InsertDataShredError { } } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct CompletedDataSetInfo { + pub slot: Slot, + pub start_index: u32, + pub end_index: u32, +} + +pub struct BlockstoreSignals { + pub blockstore: Blockstore, + pub ledger_signal_receiver: Receiver, + pub completed_slots_receiver: CompletedSlotsReceiver, +} + // ledger window pub struct Blockstore { db: Arc, @@ -339,16 +352,20 @@ impl Blockstore { pub fn open_with_signal( ledger_path: &Path, recovery_mode: Option, - ) -> Result<(Self, Receiver, CompletedSlotsReceiver)> { + ) -> Result { let mut blockstore = Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?; - let (signal_sender, signal_receiver) = sync_channel(1); + let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1); let (completed_slots_sender, completed_slots_receiver) = sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); - blockstore.new_shreds_signals = vec![signal_sender]; + blockstore.new_shreds_signals = vec![ledger_signal_sender]; blockstore.completed_slots_senders = vec![completed_slots_sender]; - Ok((blockstore, signal_receiver, completed_slots_receiver)) + Ok(BlockstoreSignals { + blockstore, + ledger_signal_receiver, + completed_slots_receiver, + }) } pub fn add_tree( @@ -723,7 +740,7 @@ impl Blockstore { is_trusted: bool, handle_duplicate: &F, metrics: &mut BlockstoreInsertionMetrics, - ) -> Result<()> + ) -> Result> where F: Fn(Shred), { @@ -746,24 +763,30 @@ impl Blockstore { let mut start = Measure::start("Shred insertion"); let mut num_inserted = 0; let mut index_meta_time = 0; + let mut newly_completed_data_sets: Vec = vec![]; shreds.into_iter().for_each(|shred| { if shred.is_data() { - if self - .check_insert_data_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - is_trusted, - handle_duplicate, - leader_schedule, - false, - ) - .is_ok() - { + let shred_slot = shred.slot(); + if let Ok(completed_data_sets) = self.check_insert_data_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + handle_duplicate, + leader_schedule, + false, + ) { + newly_completed_data_sets.extend(completed_data_sets.into_iter().map( + |(start_index, end_index)| CompletedDataSetInfo { + slot: shred_slot, + start_index, + end_index, + }, + )); num_inserted += 1; } } else if shred.is_code() { @@ -800,6 +823,7 @@ impl Blockstore { num_recovered = recovered_data.len(); recovered_data.into_iter().for_each(|shred| { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { + let shred_slot = shred.slot(); if shred.verify(&leader) { match self.check_insert_data_shred( shred, @@ -821,7 +845,16 @@ impl Blockstore { num_recovered_failed_invalid += 1; } Err(InsertDataShredError::BlockstoreError(_)) => {} - Ok(()) => { + Ok(completed_data_sets) => { + newly_completed_data_sets.extend( + completed_data_sets.into_iter().map( + |(start_index, end_index)| CompletedDataSetInfo { + slot: shred_slot, + start_index, + end_index, + }, + ), + ); num_recovered_inserted += 1; } } @@ -902,7 +935,7 @@ impl Blockstore { metrics.num_recovered_exists = num_recovered_exists; metrics.index_meta_time += index_meta_time; - Ok(()) + Ok(newly_completed_data_sets) } pub fn clear_unconfirmed_slot(&self, slot: Slot) { @@ -934,7 +967,7 @@ impl Blockstore { shreds: Vec, leader_schedule: Option<&Arc>, is_trusted: bool, - ) -> Result<()> { + ) -> Result> { self.insert_shreds_handle_duplicate( shreds, leader_schedule, @@ -1044,7 +1077,7 @@ impl Blockstore { handle_duplicate: &F, leader_schedule: Option<&Arc>, is_recovered: bool, - ) -> std::result::Result<(), InsertDataShredError> + ) -> std::result::Result, InsertDataShredError> where F: Fn(Shred), { @@ -1076,7 +1109,8 @@ impl Blockstore { } let set_index = u64::from(shred.common_header.fec_set_index); - self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?; + let newly_completed_data_sets = + self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?; just_inserted_data_shreds.insert((slot, shred_index), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; @@ -1089,7 +1123,7 @@ impl Blockstore { erasure_metas.insert((slot, set_index), meta); } } - Ok(()) + Ok(newly_completed_data_sets) } fn should_insert_coding_shred( @@ -1205,7 +1239,7 @@ impl Blockstore { data_index: &mut ShredIndex, shred: &Shred, write_batch: &mut WriteBatch, - ) -> Result<()> { + ) -> Result> { let slot = shred.slot(); let index = u64::from(shred.index()); @@ -1240,13 +1274,15 @@ impl Blockstore { // Commit step: commit all changes to the mutable structures at once, or none at all. // We don't want only a subset of these changes going through. write_batch.put_bytes::((slot, index), &shred.payload)?; - update_slot_meta( + data_index.set_present(index, true); + let newly_completed_data_sets = update_slot_meta( last_in_slot, last_in_data, slot_meta, index as u32, new_consumed, shred.reference_tick(), + &data_index, ); if slot_meta.is_full() { info!( @@ -1254,9 +1290,8 @@ impl Blockstore { slot_meta.slot, slot_meta.last_index ); } - data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); - Ok(()) + Ok(newly_completed_data_sets) } pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result>> { @@ -2310,7 +2345,12 @@ impl Blockstore { completed_ranges .par_iter() .map(|(start_index, end_index)| { - self.get_entries_in_data_block(slot, *start_index, *end_index, &slot_meta) + self.get_entries_in_data_block( + slot, + *start_index, + *end_index, + Some(&slot_meta), + ) }) .collect() }) @@ -2375,12 +2415,12 @@ impl Blockstore { completed_data_ranges } - fn get_entries_in_data_block( + pub fn get_entries_in_data_block( &self, slot: Slot, start_index: u32, end_index: u32, - slot_meta: &SlotMeta, + slot_meta: Option<&SlotMeta>, ) -> Result> { let data_shred_cf = self.db.column::(); @@ -2390,23 +2430,33 @@ impl Blockstore { data_shred_cf .get_bytes((slot, u64::from(i))) .and_then(|serialized_shred| { - Shred::new_from_serialized_shred(serialized_shred.unwrap_or_else(|| { - panic!( - "Shred with - slot: {}, - index: {}, - consumed: {}, - completed_indexes: {:?} - must exist if shred index was included in a range: {} {}", - slot, - i, - slot_meta.consumed, - slot_meta.completed_data_indexes, - start_index, - end_index - ) - })) - .map_err(|err| { + if serialized_shred.is_none() { + if let Some(slot_meta) = slot_meta { + panic!( + "Shred with + slot: {}, + index: {}, + consumed: {}, + completed_indexes: {:?} + must exist if shred index was included in a range: {} {}", + slot, + i, + slot_meta.consumed, + slot_meta.completed_data_indexes, + start_index, + end_index + ); + } else { + return Err(BlockstoreError::InvalidShredData(Box::new( + bincode::ErrorKind::Custom(format!( + "Missing shred for slot {}, index {}", + slot, i + )), + ))); + } + } + + Shred::new_from_serialized_shred(serialized_shred.unwrap()).map_err(|err| { BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( format!( "Could not reconstruct shred from shred payload: {:?}", @@ -2450,8 +2500,13 @@ impl Blockstore { completed_ranges .par_iter() .map(|(start_index, end_index)| { - self.get_entries_in_data_block(slot, *start_index, *end_index, &slot_meta) - .unwrap_or_default() + self.get_entries_in_data_block( + slot, + *start_index, + *end_index, + Some(&slot_meta), + ) + .unwrap_or_default() }) .collect() }) @@ -2601,6 +2656,89 @@ impl Blockstore { } } +// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a +// data set is complete, return the range of shred indexes [start_index, end_index] +// for that completed data set. +fn update_completed_data_indexes( + is_last_in_data: bool, + new_shred_index: u32, + received_data_shreds: &ShredIndex, + // Sorted array of shred indexes marked data complete + completed_data_indexes: &mut Vec, +) -> Vec<(u32, u32)> { + let mut first_greater_pos = None; + let mut prev_completed_shred_index = None; + // Find the first item in `completed_data_indexes > new_shred_index` + for (i, completed_data_index) in completed_data_indexes.iter().enumerate() { + // `completed_data_indexes` should be sorted from smallest to largest + assert!( + prev_completed_shred_index.is_none() + || *completed_data_index > prev_completed_shred_index.unwrap() + ); + if *completed_data_index > new_shred_index { + first_greater_pos = Some(i); + break; + } + prev_completed_shred_index = Some(*completed_data_index); + } + + // Consecutive entries i, k, j in this vector represent potential ranges [i, k), + // [k, j) that could be completed data ranges + let mut check_ranges: Vec = vec![prev_completed_shred_index + .map(|completed_data_shred_index| completed_data_shred_index + 1) + .unwrap_or(0)]; + let mut first_greater_data_complete_index = + first_greater_pos.map(|i| completed_data_indexes[i]); + + // `new_shred_index` is data complete, so need to insert here into the + // `completed_data_indexes` + if is_last_in_data { + if first_greater_pos.is_some() { + // If there exists a data complete shred greater than `new_shred_index`, + // and the new shred is marked data complete, then the range + // [new_shred_index + 1, completed_data_indexes[pos]] may be complete, + // so add that range to check + check_ranges.push(new_shred_index + 1); + } + completed_data_indexes.insert( + first_greater_pos.unwrap_or_else(|| { + // If `first_greater_pos` is none, then there was no greater + // data complete index so mark this new shred's index as the latest data + // complete index + first_greater_data_complete_index = Some(new_shred_index); + completed_data_indexes.len() + }), + new_shred_index, + ); + } + + if first_greater_data_complete_index.is_none() { + // That means new_shred_index > all known completed data indexes and + // new shred not data complete, which means the data set of that new + // shred is not data complete + return vec![]; + } + + check_ranges.push(first_greater_data_complete_index.unwrap() + 1); + let mut completed_data_ranges = vec![]; + for range in check_ranges.windows(2) { + let mut is_complete = true; + for shred_index in range[0]..range[1] { + // If we're missing any shreds, the data set cannot be confirmed + // to be completed, so check the next range + if !received_data_shreds.is_present(shred_index as u64) { + is_complete = false; + break; + } + } + if is_complete { + completed_data_ranges.push((range[0], range[1] - 1)); + } + } + + completed_data_ranges +} + fn update_slot_meta( is_last_in_slot: bool, is_last_in_data: bool, @@ -2608,7 +2746,8 @@ fn update_slot_meta( index: u32, new_consumed: u64, reference_tick: u8, -) { + received_data_shreds: &ShredIndex, +) -> Vec<(u32, u32)> { let maybe_first_insert = slot_meta.received == 0; // Index is zero-indexed, while the "received" height starts from 1, // so received = index + 1 for the same shred. @@ -2633,15 +2772,12 @@ fn update_slot_meta( } }; - if is_last_in_slot || is_last_in_data { - let position = slot_meta - .completed_data_indexes - .iter() - .position(|completed_data_index| *completed_data_index > index) - .unwrap_or_else(|| slot_meta.completed_data_indexes.len()); - - slot_meta.completed_data_indexes.insert(position, index); - } + update_completed_data_indexes( + is_last_in_slot || is_last_in_data, + index, + received_data_shreds, + &mut slot_meta.completed_data_indexes, + ) } fn get_index_meta_entry<'a>( @@ -3973,11 +4109,49 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + fn test_data_set_completed_on_insert() { + let ledger_path = get_tmp_ledger_path!(); + let BlockstoreSignals { blockstore, .. } = + Blockstore::open_with_signal(&ledger_path, None).unwrap(); + + // Create enough entries to fill 2 shreds, only the later one is data complete + let slot = 0; + let num_entries = max_ticks_per_n_shreds(1, None) + 1; + let entries = create_ticks(num_entries, slot, Hash::default()); + let shreds = entries_to_test_shreds(entries, slot, 0, true, 0); + let num_shreds = shreds.len(); + assert!(num_shreds > 1); + assert!(blockstore + .insert_shreds(shreds[1..].to_vec(), None, false) + .unwrap() + .is_empty()); + assert_eq!( + blockstore + .insert_shreds(vec![shreds[0].clone()], None, false) + .unwrap(), + vec![CompletedDataSetInfo { + slot, + start_index: 0, + end_index: num_shreds as u32 - 1 + }] + ); + // Inserting shreds again doesn't trigger notification + assert!(blockstore + .insert_shreds(shreds, None, false) + .unwrap() + .is_empty()); + } + #[test] pub fn test_new_shreds_signal() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); + let BlockstoreSignals { + blockstore: ledger, + ledger_signal_receiver: recvr, + .. + } = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 50; @@ -4057,7 +4231,11 @@ pub mod tests { pub fn test_completed_shreds_signal() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); + let BlockstoreSignals { + blockstore: ledger, + completed_slots_receiver: recvr, + .. + } = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; @@ -4079,7 +4257,11 @@ pub mod tests { pub fn test_completed_shreds_signal_orphans() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); + let BlockstoreSignals { + blockstore: ledger, + completed_slots_receiver: recvr, + .. + } = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; @@ -4119,7 +4301,11 @@ pub mod tests { pub fn test_completed_shreds_signal_many() { // Initialize ledger let ledger_path = get_tmp_ledger_path!(); - let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); + let BlockstoreSignals { + blockstore: ledger, + completed_slots_receiver: recvr, + .. + } = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let ledger = Arc::new(ledger); let entries_per_slot = 10; @@ -7049,4 +7235,64 @@ pub mod tests { } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + + #[test] + fn test_update_completed_data_indexes() { + let mut completed_data_indexes: Vec = vec![]; + let mut shred_index = ShredIndex::default(); + + for i in 0..10 { + shred_index.set_present(i as u64, true); + assert_eq!( + update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes), + vec![(i, i)] + ); + assert_eq!(completed_data_indexes, (0..=i).collect::>()); + } + } + + #[test] + fn test_update_completed_data_indexes_out_of_order() { + let mut completed_data_indexes = vec![]; + let mut shred_index = ShredIndex::default(); + + shred_index.set_present(4, true); + assert!( + update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes) + .is_empty() + ); + assert!(completed_data_indexes.is_empty()); + + shred_index.set_present(2, true); + assert!( + update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes) + .is_empty() + ); + assert!(completed_data_indexes.is_empty()); + + shred_index.set_present(3, true); + assert!( + update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes) + .is_empty() + ); + assert_eq!(completed_data_indexes, vec![3]); + + // Inserting data complete shred 1 now confirms the range of shreds [2, 3] + // is part of the same data set + shred_index.set_present(1, true); + assert_eq!( + update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes), + vec![(2, 3)] + ); + assert_eq!(completed_data_indexes, vec![1, 3]); + + // Inserting data complete shred 0 now confirms the range of shreds [0] + // is part of the same data set + shred_index.set_present(0, true); + assert_eq!( + update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes), + vec![(0, 0), (1, 1)] + ); + assert_eq!(completed_data_indexes, vec![0, 1, 3]); + } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 9e5c57b969..a30bc00ce2 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -2,10 +2,14 @@ use assert_matches::assert_matches; use gag::BufferRedirect; use log::*; use serial_test_derive::serial; -use solana_client::rpc_client::RpcClient; -use solana_client::thin_client::create_client; +use solana_client::{ + pubsub_client::{PubsubClient, SignatureResult}, + rpc_client::RpcClient, + thin_client::create_client, +}; use solana_core::{ broadcast_stage::BroadcastStageType, + cluster_info::VALIDATOR_PORT_RANGE, consensus::{SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, gossip_service::discover_cluster, validator::ValidatorConfig, @@ -34,6 +38,7 @@ use solana_sdk::{ poh_config::PohConfig, pubkey::Pubkey, signature::{Keypair, Signer}, + system_transaction, }; use std::sync::atomic::{AtomicBool, Ordering}; use std::{ @@ -132,6 +137,71 @@ fn test_spend_and_verify_all_nodes_3() { ); } +#[test] +#[serial] +fn test_local_cluster_signature_subscribe() { + solana_logger::setup(); + let num_nodes = 2; + let cluster = LocalCluster::new_with_equal_stakes(num_nodes, 10_000, 100); + let nodes = cluster.get_node_pubkeys(); + + // Get non leader + let non_bootstrap_id = nodes + .into_iter() + .find(|id| *id != cluster.entry_point_info.id) + .unwrap(); + let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); + + let tx_client = create_client( + non_bootstrap_info.client_facing_addr(), + VALIDATOR_PORT_RANGE, + ); + let (blockhash, _fee_calculator, _last_valid_slot) = tx_client + .get_recent_blockhash_with_commitment(CommitmentConfig::recent()) + .unwrap(); + + let mut transaction = + system_transaction::transfer(&cluster.funding_keypair, &Pubkey::new_rand(), 10, blockhash); + + let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe( + &format!("ws://{}", &non_bootstrap_info.rpc_pubsub.to_string()), + &transaction.signatures[0], + ) + .unwrap(); + + tx_client + .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) + .unwrap(); + + let mut got_received_notification = false; + loop { + let responses: Vec<_> = receiver.try_iter().collect(); + let mut should_break = false; + for response in responses { + match response.value { + SignatureResult::ProcessedSignatureResult(_) => { + should_break = true; + break; + } + SignatureResult::ReceivedSignature => { + got_received_notification = true; + } + } + } + + if should_break { + break; + } + sleep(Duration::from_millis(100)); + } + + // If we don't drop the cluster, the blocking web socket service + // won't return, and the `sig_subscribe_client` won't shut down + drop(cluster); + sig_subscribe_client.shutdown().unwrap(); + assert!(got_received_notification); +} + #[test] #[allow(unused_attributes)] #[ignore]