Detect and notify when deserializable shreds are available (#11816)

* Add logic to check for complete data ranges

* Add RPC signature notification

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-09-01 22:06:06 -07:00 committed by GitHub
parent 7568bb573f
commit 1c1a3f979d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 854 additions and 148 deletions

View File

@ -10,10 +10,10 @@ homepage = "https://solana.com/"
[dependencies] [dependencies]
log = "0.4.6" log = "0.4.6"
rayon = "1.4.0" rayon = "1.4.0"
solana-logger = { path = "../logger", version = "1.4.0" } solana-logger = { path = "../logger", version = "1.3.6" }
solana-runtime = { path = "../runtime", version = "1.4.0" } solana-runtime = { path = "../runtime", version = "1.3.6" }
solana-measure = { path = "../measure", version = "1.4.0" } solana-measure = { path = "../measure", version = "1.3.6" }
solana-sdk = { path = "../sdk", version = "1.4.0" } solana-sdk = { path = "../sdk", version = "1.3.6" }
rand = "0.7.0" rand = "0.7.0"
clap = "2.33.1" clap = "2.33.1"
crossbeam-channel = "0.4" crossbeam-channel = "0.4"

View File

@ -5,6 +5,9 @@ use serde_json::{
value::Value::{Number, Object}, value::Value::{Number, Object},
Map, Value, Map, Value,
}; };
use solana_sdk::{
commitment_config::CommitmentConfig, signature::Signature, transaction::TransactionError,
};
use std::{ use std::{
marker::PhantomData, marker::PhantomData,
sync::{ sync::{
@ -18,6 +21,8 @@ use thiserror::Error;
use tungstenite::{client::AutoStream, connect, Message, WebSocket}; use tungstenite::{client::AutoStream, connect, Message, WebSocket};
use url::{ParseError, Url}; use url::{ParseError, Url};
type PubsubSignatureResponse = PubsubClientSubscription<RpcResponse<SignatureResult>>;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum PubsubClientError { pub enum PubsubClientError {
#[error("url parse error")] #[error("url parse error")]
@ -33,6 +38,30 @@ pub enum PubsubClientError {
UnexpectedMessageError, UnexpectedMessageError,
} }
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase", tag = "type", content = "result")]
pub enum SignatureResult {
ProcessedSignatureResult(ProcessedSignatureResult),
ReceivedSignature,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct RpcResponseContext {
pub slot: u64,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedSignatureResult {
pub err: Option<TransactionError>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RpcResponse<T> {
pub context: RpcResponseContext,
pub value: T,
}
#[derive(Serialize, Deserialize, PartialEq, Clone, Debug)] #[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
pub struct SlotInfoMessage { pub struct SlotInfoMessage {
pub parent: u64, pub parent: u64,
@ -40,6 +69,14 @@ pub struct SlotInfoMessage {
pub slot: u64, pub slot: u64,
} }
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcSignatureSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub enable_received_notification: Option<bool>,
}
pub struct PubsubClientSubscription<T> pub struct PubsubClientSubscription<T>
where where
T: DeserializeOwned, T: DeserializeOwned,
@ -73,18 +110,12 @@ where
{ {
fn send_subscribe( fn send_subscribe(
writable_socket: &Arc<RwLock<WebSocket<AutoStream>>>, writable_socket: &Arc<RwLock<WebSocket<AutoStream>>>,
operation: &str, body: String,
) -> Result<u64, PubsubClientError> { ) -> Result<u64, PubsubClientError> {
let method = format!("{}Subscribe", operation);
writable_socket writable_socket
.write() .write()
.unwrap() .unwrap()
.write_message(Message::Text( .write_message(Message::Text(body))?;
json!({
"jsonrpc":"2.0","id":1,"method":method,"params":[]
})
.to_string(),
))?;
let message = writable_socket.write().unwrap().read_message()?; let message = writable_socket.write().unwrap().read_message()?;
Self::extract_subscription_id(message) Self::extract_subscription_id(message)
} }
@ -148,6 +179,7 @@ where
} }
const SLOT_OPERATION: &str = "slot"; const SLOT_OPERATION: &str = "slot";
const SIGNATURE_OPERATION: &str = "signature";
pub struct PubsubClient {} pub struct PubsubClient {}
@ -171,7 +203,10 @@ impl PubsubClient {
let exit_clone = exit.clone(); let exit_clone = exit.clone();
let subscription_id = PubsubClientSubscription::<SlotInfoMessage>::send_subscribe( let subscription_id = PubsubClientSubscription::<SlotInfoMessage>::send_subscribe(
&socket_clone, &socket_clone,
SLOT_OPERATION, json!({
"jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[]
})
.to_string(),
) )
.unwrap(); .unwrap();
@ -212,6 +247,80 @@ impl PubsubClient {
Ok((result, receiver)) Ok((result, receiver))
} }
pub fn signature_subscribe(
url: &str,
signature: &Signature,
) -> Result<
(
PubsubSignatureResponse,
Receiver<RpcResponse<SignatureResult>>,
),
PubsubClientError,
> {
let url = Url::parse(url)?;
let (socket, _response) = connect(url)?;
let (sender, receiver) = channel::<RpcResponse<SignatureResult>>();
let socket = Arc::new(RwLock::new(socket));
let socket_clone = socket.clone();
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();
let body = json!({
"jsonrpc":"2.0",
"id":1,
"method":format!("{}Subscribe", SIGNATURE_OPERATION),
"params":[
signature.to_string(),
{"enableReceivedNotification": true }
]
})
.to_string();
let subscription_id =
PubsubClientSubscription::<RpcResponse<SignatureResult>>::send_subscribe(
&socket_clone,
body,
)
.unwrap();
let t_cleanup = std::thread::spawn(move || {
loop {
if exit_clone.load(Ordering::Relaxed) {
break;
}
let message: Result<RpcResponse<SignatureResult>, PubsubClientError> =
PubsubClientSubscription::read_message(&socket_clone);
if let Ok(msg) = message {
match sender.send(msg.clone()) {
Ok(_) => (),
Err(err) => {
info!("receive error: {:?}", err);
break;
}
}
} else {
info!("receive error: {:?}", message);
break;
}
}
info!("websocket - exited receive loop");
});
let result: PubsubClientSubscription<RpcResponse<SignatureResult>> =
PubsubClientSubscription {
message_type: PhantomData,
operation: SIGNATURE_OPERATION,
socket,
subscription_id,
t_cleanup: Some(t_cleanup),
exit,
};
Ok((result, receiver))
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -69,6 +69,14 @@ pub enum RpcTokenAccountsFilter {
ProgramId(String), ProgramId(String),
} }
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RpcSignatureSubscribeConfig {
#[serde(flatten)]
pub commitment: Option<CommitmentConfig>,
pub enable_received_notification: Option<bool>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RpcGetConfirmedSignaturesForAddress2Config { pub struct RpcGetConfirmedSignaturesForAddress2Config {

View File

@ -94,9 +94,16 @@ pub struct RpcKeyedAccount {
pub account: UiAccount, pub account: UiAccount,
} }
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase", tag = "type", content = "result")]
pub enum RpcSignatureResult {
ProcessedSignatureResult(ProcessedSignatureResult),
ReceivedSignature,
}
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct RpcSignatureResult { pub struct ProcessedSignatureResult {
pub err: Option<TransactionError>, pub err: Option<TransactionError>,
} }

View File

@ -0,0 +1,82 @@
use crate::rpc_subscriptions::RpcSubscriptions;
use crossbeam_channel::{Receiver, RecvTimeoutError, Sender};
use solana_ledger::blockstore::{Blockstore, CompletedDataSetInfo};
use solana_sdk::signature::Signature;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, Builder, JoinHandle},
time::Duration,
};
pub type CompletedDataSetsReceiver = Receiver<Vec<CompletedDataSetInfo>>;
pub type CompletedDataSetsSender = Sender<Vec<CompletedDataSetInfo>>;
pub struct CompletedDataSetsService {
thread_hdl: JoinHandle<()>,
}
impl CompletedDataSetsService {
pub fn new(
completed_sets_receiver: CompletedDataSetsReceiver,
blockstore: Arc<Blockstore>,
rpc_subscriptions: Arc<RpcSubscriptions>,
exit: &Arc<AtomicBool>,
) -> Self {
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("completed-data-set-service".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(RecvTimeoutError::Disconnected) = Self::recv_completed_data_sets(
&completed_sets_receiver,
&blockstore,
&rpc_subscriptions,
) {
break;
}
})
.unwrap();
Self { thread_hdl }
}
fn recv_completed_data_sets(
completed_sets_receiver: &CompletedDataSetsReceiver,
blockstore: &Blockstore,
rpc_subscriptions: &RpcSubscriptions,
) -> Result<(), RecvTimeoutError> {
let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?;
for completed_set_info in std::iter::once(completed_data_sets)
.chain(completed_sets_receiver.try_iter())
.flatten()
{
let CompletedDataSetInfo {
slot,
start_index,
end_index,
} = completed_set_info;
match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) {
Ok(entries) => {
let transactions = entries
.into_iter()
.flat_map(|e| e.transactions.into_iter().map(|t| t.signatures[0]))
.collect::<Vec<Signature>>();
if !transactions.is_empty() {
rpc_subscriptions.notify_signatures_received((slot, transactions));
}
}
Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e),
}
}
Ok(())
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}

View File

@ -12,6 +12,7 @@ pub mod banking_stage;
pub mod broadcast_stage; pub mod broadcast_stage;
pub mod cluster_info_vote_listener; pub mod cluster_info_vote_listener;
pub mod commitment_service; pub mod commitment_service;
pub mod completed_data_sets_service;
mod deprecated; mod deprecated;
pub mod shred_fetch_stage; pub mod shred_fetch_stage;
#[macro_use] #[macro_use]

View File

@ -20,6 +20,7 @@ pub enum Error {
ReadyTimeoutError, ReadyTimeoutError,
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
CrossbeamSendError, CrossbeamSendError,
TryCrossbeamSendError,
TryRecvError(std::sync::mpsc::TryRecvError), TryRecvError(std::sync::mpsc::TryRecvError),
Serialize(std::boxed::Box<bincode::ErrorKind>), Serialize(std::boxed::Box<bincode::ErrorKind>),
TransactionError(transaction::TransactionError), TransactionError(transaction::TransactionError),
@ -87,6 +88,11 @@ impl<T> std::convert::From<crossbeam_channel::SendError<T>> for Error {
Error::CrossbeamSendError Error::CrossbeamSendError
} }
} }
impl<T> std::convert::From<crossbeam_channel::TrySendError<T>> for Error {
fn from(_e: crossbeam_channel::TrySendError<T>) -> Error {
Error::TryCrossbeamSendError
}
}
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error { impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
fn from(_e: std::sync::mpsc::SendError<T>) -> Error { fn from(_e: std::sync::mpsc::SendError<T>) -> Error {
Error::SendError Error::SendError

View File

@ -5,6 +5,7 @@ use crate::{
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
cluster_slots_service::ClusterSlotsService, cluster_slots_service::ClusterSlotsService,
completed_data_sets_service::CompletedDataSetsSender,
contact_info::ContactInfo, contact_info::ContactInfo,
repair_service::DuplicateSlotsResetSender, repair_service::DuplicateSlotsResetSender,
repair_service::RepairInfo, repair_service::RepairInfo,
@ -419,6 +420,7 @@ impl RetransmitStage {
duplicate_slots_reset_sender: DuplicateSlotsResetSender, duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>, repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -472,6 +474,7 @@ impl RetransmitStage {
}, },
cluster_slots, cluster_slots,
verified_vote_receiver, verified_vote_receiver,
completed_data_sets_sender,
); );
let thread_hdls = t_retransmit; let thread_hdls = t_retransmit;

View File

@ -6,14 +6,12 @@ use jsonrpc_derive::rpc;
use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId};
use solana_account_decoder::UiAccount; use solana_account_decoder::UiAccount;
use solana_client::{ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult}, rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult},
}; };
#[cfg(test)] #[cfg(test)]
use solana_runtime::bank_forks::BankForks; use solana_runtime::bank_forks::BankForks;
use solana_sdk::{ use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Signature,
};
#[cfg(test)] #[cfg(test)]
use std::sync::RwLock; use std::sync::RwLock;
use std::{ use std::{
@ -89,7 +87,7 @@ pub trait RpcSolPubSub {
meta: Self::Metadata, meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcSignatureResult>>, subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
signature_str: String, signature_str: String,
commitment: Option<CommitmentConfig>, config: Option<RpcSignatureSubscribeConfig>,
); );
// Unsubscribe from signature notification subscription. // Unsubscribe from signature notification subscription.
@ -248,7 +246,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
_meta: Self::Metadata, _meta: Self::Metadata,
subscriber: Subscriber<RpcResponse<RpcSignatureResult>>, subscriber: Subscriber<RpcResponse<RpcSignatureResult>>,
signature_str: String, signature_str: String,
commitment: Option<CommitmentConfig>, signature_subscribe_config: Option<RpcSignatureSubscribeConfig>,
) { ) {
info!("signature_subscribe"); info!("signature_subscribe");
match param::<Signature>(&signature_str, "signature") { match param::<Signature>(&signature_str, "signature") {
@ -259,8 +257,12 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
"signature_subscribe: signature={:?} id={:?}", "signature_subscribe: signature={:?} id={:?}",
signature, sub_id signature, sub_id
); );
self.subscriptions self.subscriptions.add_signature_subscription(
.add_signature_subscription(signature, commitment, sub_id, subscriber); signature,
signature_subscribe_config,
sub_id,
subscriber,
);
} }
Err(e) => subscriber.reject(e).unwrap(), Err(e) => subscriber.reject(e).unwrap(),
} }
@ -359,6 +361,7 @@ mod tests {
use jsonrpc_pubsub::{PubSubHandler, Session}; use jsonrpc_pubsub::{PubSubHandler, Session};
use serial_test_derive::serial; use serial_test_derive::serial;
use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding}; use solana_account_decoder::{parse_account_data::parse_account_data, UiAccountEncoding};
use solana_client::rpc_response::ProcessedSignatureResult;
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
bank_forks::BankForks, bank_forks::BankForks,
@ -369,6 +372,7 @@ mod tests {
}, },
}; };
use solana_sdk::{ use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash, hash::Hash,
message::Message, message::Message,
pubkey::Pubkey, pubkey::Pubkey,
@ -442,7 +446,8 @@ mod tests {
// Test signature confirmation notification // Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver); let (response, _) = robust_poll_or_panic(receiver);
let expected_res = RpcSignatureResult { err: None }; let expected_res =
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None });
let expected = json!({ let expected = json!({
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "signatureNotification", "method": "signatureNotification",
@ -454,6 +459,38 @@ mod tests {
"subscription": 0, "subscription": 0,
} }
}); });
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
// Test "received"
let session = create_session();
let (subscriber, _id_receiver, receiver) = Subscriber::new_test("signatureNotification");
rpc.signature_subscribe(
session,
subscriber,
tx.signatures[0].to_string(),
Some(RpcSignatureSubscribeConfig {
commitment: None,
enable_received_notification: Some(true),
}),
);
let received_slot = 1;
rpc.subscriptions
.notify_signatures_received((received_slot, vec![tx.signatures[0]]));
// Test signature confirmation notification
let (response, _) = robust_poll_or_panic(receiver);
let expected_res = RpcSignatureResult::ReceivedSignature;
let expected = json!({
"jsonrpc": "2.0",
"method": "signatureNotification",
"params": {
"result": {
"context": { "slot": received_slot },
"value": expected_res,
},
"subscription": 1,
}
});
assert_eq!(serde_json::to_string(&expected).unwrap(), response); assert_eq!(serde_json::to_string(&expected).unwrap(), response);
} }

View File

@ -10,9 +10,11 @@ use jsonrpc_pubsub::{
use serde::Serialize; use serde::Serialize;
use solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding}; use solana_account_decoder::{parse_token::spl_token_id_v2_0, UiAccount, UiAccountEncoding};
use solana_client::{ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_filter::RpcFilterType, rpc_filter::RpcFilterType,
rpc_response::{Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult}, rpc_response::{
ProcessedSignatureResult, Response, RpcKeyedAccount, RpcResponseContext, RpcSignatureResult,
},
}; };
use solana_runtime::{ use solana_runtime::{
bank::Bank, bank::Bank,
@ -67,6 +69,7 @@ enum NotificationEntry {
Frozen(Slot), Frozen(Slot),
Bank(CommitmentSlots), Bank(CommitmentSlots),
Gossip(Slot), Gossip(Slot),
SignaturesReceived((Slot, Vec<Signature>)),
} }
impl std::fmt::Debug for NotificationEntry { impl std::fmt::Debug for NotificationEntry {
@ -79,6 +82,9 @@ impl std::fmt::Debug for NotificationEntry {
NotificationEntry::Bank(commitment_slots) => { NotificationEntry::Bank(commitment_slots) => {
write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot) write!(f, "Bank({{slot: {:?}}})", commitment_slots.slot)
} }
NotificationEntry::SignaturesReceived(slot_signatures) => {
write!(f, "SignaturesReceived({:?})", slot_signatures)
}
NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot), NotificationEntry::Gossip(slot) => write!(f, "Gossip({:?})", slot),
} }
} }
@ -108,7 +114,10 @@ type RpcProgramSubscriptions = RwLock<
>, >,
>; >;
type RpcSignatureSubscriptions = RwLock< type RpcSignatureSubscriptions = RwLock<
HashMap<Signature, HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>, ()>>>, HashMap<
Signature,
HashMap<SubscriptionId, SubscriptionData<Response<RpcSignatureResult>, bool>>,
>,
>; >;
type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>; type RpcSlotSubscriptions = RwLock<HashMap<SubscriptionId, Sink<SlotInfo>>>;
type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>; type RpcVoteSubscriptions = RwLock<HashMap<SubscriptionId, Sink<RpcVote>>>;
@ -134,13 +143,11 @@ fn add_subscription<K, S, T>(
last_notified_slot: RwLock::new(last_notified_slot), last_notified_slot: RwLock::new(last_notified_slot),
config, config,
}; };
if let Some(current_hashmap) = subscriptions.get_mut(&hashmap_key) {
current_hashmap.insert(sub_id, subscription_data); subscriptions
return; .entry(hashmap_key)
} .or_default()
let mut hashmap = HashMap::new(); .insert(sub_id, subscription_data);
hashmap.insert(sub_id, subscription_data);
subscriptions.insert(hashmap_key, hashmap);
} }
fn remove_subscription<K, S, T>( fn remove_subscription<K, S, T>(
@ -279,15 +286,15 @@ fn filter_signature_result(
result: Option<transaction::Result<()>>, result: Option<transaction::Result<()>>,
_signature: &Signature, _signature: &Signature,
last_notified_slot: Slot, last_notified_slot: Slot,
_config: Option<()>, _config: Option<bool>,
_bank: Option<Arc<Bank>>, _bank: Option<Arc<Bank>>,
) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) { ) -> (Box<dyn Iterator<Item = RpcSignatureResult>>, Slot) {
( (
Box::new( Box::new(result.into_iter().map(|result| {
result RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult {
.into_iter() err: result.err(),
.map(|result| RpcSignatureResult { err: result.err() }), })
), })),
last_notified_slot, last_notified_slot,
) )
} }
@ -629,13 +636,18 @@ impl RpcSubscriptions {
pub fn add_signature_subscription( pub fn add_signature_subscription(
&self, &self,
signature: Signature, signature: Signature,
commitment: Option<CommitmentConfig>, signature_subscribe_config: Option<RpcSignatureSubscribeConfig>,
sub_id: SubscriptionId, sub_id: SubscriptionId,
subscriber: Subscriber<Response<RpcSignatureResult>>, subscriber: Subscriber<Response<RpcSignatureResult>>,
) { ) {
let (commitment, enable_received_notification) = signature_subscribe_config
.map(|config| (config.commitment, config.enable_received_notification))
.unwrap_or((None, Some(false)));
let commitment_level = commitment let commitment_level = commitment
.unwrap_or_else(CommitmentConfig::recent) .unwrap_or_else(CommitmentConfig::recent)
.commitment; .commitment;
let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip { let mut subscriptions = if commitment_level == CommitmentLevel::SingleGossip {
self.subscriptions self.subscriptions
.gossip_signature_subscriptions .gossip_signature_subscriptions
@ -651,7 +663,7 @@ impl RpcSubscriptions {
sub_id, sub_id,
subscriber, subscriber,
0, // last_notified_slot is not utilized for signature subscriptions 0, // last_notified_slot is not utilized for signature subscriptions
None, enable_received_notification,
); );
} }
@ -696,6 +708,10 @@ impl RpcSubscriptions {
self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root })); self.enqueue_notification(NotificationEntry::Slot(SlotInfo { slot, parent, root }));
} }
pub fn notify_signatures_received(&self, slot_signatures: (Slot, Vec<Signature>)) {
self.enqueue_notification(NotificationEntry::SignaturesReceived(slot_signatures));
}
pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) { pub fn add_vote_subscription(&self, sub_id: SubscriptionId, subscriber: Subscriber<RpcVote>) {
let sink = subscriber.assign_id(sub_id.clone()).unwrap(); let sink = subscriber.assign_id(sub_id.clone()).unwrap();
let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap(); let mut subscriptions = self.subscriptions.vote_subscriptions.write().unwrap();
@ -840,6 +856,13 @@ impl RpcSubscriptions {
); );
} }
} }
NotificationEntry::SignaturesReceived(slot_signatures) => {
RpcSubscriptions::process_signatures_received(
&slot_signatures,
&subscriptions.signature_subscriptions,
&notifier,
)
}
}, },
Err(RecvTimeoutError::Timeout) => { Err(RecvTimeoutError::Timeout) => {
// not a problem - try reading again // not a problem - try reading again
@ -939,6 +962,40 @@ impl RpcSubscriptions {
} }
} }
fn process_signatures_received(
(received_slot, signatures): &(Slot, Vec<Signature>),
signature_subscriptions: &Arc<RpcSignatureSubscriptions>,
notifier: &RpcNotifier,
) {
for signature in signatures {
if let Some(hashmap) = signature_subscriptions.read().unwrap().get(signature) {
for (
_,
SubscriptionData {
sink,
config: is_received_notification_enabled,
..
},
) in hashmap.iter()
{
if is_received_notification_enabled
.expect("All signature subscriptions must have this config field set")
{
notifier.notify(
Response {
context: RpcResponseContext {
slot: *received_slot,
},
value: RpcSignatureResult::ReceivedSignature,
},
&sink,
);
}
}
}
}
}
fn shutdown(&mut self) -> std::thread::Result<()> { fn shutdown(&mut self) -> std::thread::Result<()> {
if let Some(runtime) = self.notifier_runtime.take() { if let Some(runtime) = self.notifier_runtime.take() {
info!("RPC Notifier runtime - shutting down"); info!("RPC Notifier runtime - shutting down");
@ -1245,31 +1302,55 @@ pub(crate) mod tests {
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
let (processed_sub, _id_receiver, processed_recv) = let (processed_sub, _id_receiver, processed_recv) =
Subscriber::new_test("signatureNotification"); Subscriber::new_test("signatureNotification");
let (processed_sub3, _id_receiver, processed_recv3) =
Subscriber::new_test("signatureNotification");
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(CommitmentConfig::recent()), Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::recent()),
enable_received_notification: Some(false),
}),
SubscriptionId::Number(1 as u64), SubscriptionId::Number(1 as u64),
past_bank_sub1, past_bank_sub1,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
past_bank_tx.signatures[0], past_bank_tx.signatures[0],
Some(CommitmentConfig::root()), Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::root()),
enable_received_notification: Some(false),
}),
SubscriptionId::Number(2 as u64), SubscriptionId::Number(2 as u64),
past_bank_sub2, past_bank_sub2,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
processed_tx.signatures[0], processed_tx.signatures[0],
Some(CommitmentConfig::recent()), Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::recent()),
enable_received_notification: Some(false),
}),
SubscriptionId::Number(3 as u64), SubscriptionId::Number(3 as u64),
processed_sub, processed_sub,
); );
subscriptions.add_signature_subscription( subscriptions.add_signature_subscription(
unprocessed_tx.signatures[0], unprocessed_tx.signatures[0],
Some(CommitmentConfig::recent()), Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::recent()),
enable_received_notification: Some(false),
}),
SubscriptionId::Number(4 as u64), SubscriptionId::Number(4 as u64),
Subscriber::new_test("signatureNotification").0, Subscriber::new_test("signatureNotification").0,
); );
// Add a subscription that gets `received` notifications
subscriptions.add_signature_subscription(
unprocessed_tx.signatures[0],
Some(RpcSignatureSubscribeConfig {
commitment: Some(CommitmentConfig::recent()),
enable_received_notification: Some(true),
}),
SubscriptionId::Number(5 as u64),
processed_sub3,
);
{ {
let sig_subs = subscriptions let sig_subs = subscriptions
@ -1282,46 +1363,62 @@ pub(crate) mod tests {
assert!(sig_subs.contains_key(&processed_tx.signatures[0])); assert!(sig_subs.contains_key(&processed_tx.signatures[0]));
} }
let mut commitment_slots = CommitmentSlots::default(); let mut commitment_slots = CommitmentSlots::default();
commitment_slots.slot = 1; let received_slot = 1;
commitment_slots.slot = received_slot;
subscriptions
.notify_signatures_received((received_slot, vec![unprocessed_tx.signatures[0]]));
subscriptions.notify_subscribers(commitment_slots); subscriptions.notify_subscribers(commitment_slots);
let expected_res = RpcSignatureResult { err: None }; let expected_res =
RpcSignatureResult::ProcessedSignatureResult(ProcessedSignatureResult { err: None });
let received_expected_res = RpcSignatureResult::ReceivedSignature;
struct Notification { struct Notification {
slot: Slot, slot: Slot,
id: u64, id: u64,
} }
let expected_notification = |exp: Notification| -> String { let expected_notification =
let json = json!({ |exp: Notification, expected_res: &RpcSignatureResult| -> String {
"jsonrpc": "2.0", let json = json!({
"method": "signatureNotification", "jsonrpc": "2.0",
"params": { "method": "signatureNotification",
"result": { "params": {
"context": { "slot": exp.slot }, "result": {
"value": &expected_res, "context": { "slot": exp.slot },
}, "value": expected_res,
"subscription": exp.id, },
} "subscription": exp.id,
}); }
serde_json::to_string(&json).unwrap() });
}; serde_json::to_string(&json).unwrap()
};
// Expect to receive a notification from bank 1 because this subscription is // Expect to receive a notification from bank 1 because this subscription is
// looking for 0 confirmations and so checks the current bank // looking for 0 confirmations and so checks the current bank
let expected = expected_notification(Notification { slot: 1, id: 1 }); let expected = expected_notification(Notification { slot: 1, id: 1 }, &expected_res);
let (response, _) = robust_poll_or_panic(past_bank_recv1); let (response, _) = robust_poll_or_panic(past_bank_recv1);
assert_eq!(expected, response); assert_eq!(expected, response);
// Expect to receive a notification from bank 0 because this subscription is // Expect to receive a notification from bank 0 because this subscription is
// looking for 1 confirmation and so checks the past bank // looking for 1 confirmation and so checks the past bank
let expected = expected_notification(Notification { slot: 0, id: 2 }); let expected = expected_notification(Notification { slot: 0, id: 2 }, &expected_res);
let (response, _) = robust_poll_or_panic(past_bank_recv2); let (response, _) = robust_poll_or_panic(past_bank_recv2);
assert_eq!(expected, response); assert_eq!(expected, response);
let expected = expected_notification(Notification { slot: 1, id: 3 }); let expected = expected_notification(Notification { slot: 1, id: 3 }, &expected_res);
let (response, _) = robust_poll_or_panic(processed_recv); let (response, _) = robust_poll_or_panic(processed_recv);
assert_eq!(expected, response); assert_eq!(expected, response);
// Expect a "received" notification
let expected = expected_notification(
Notification {
slot: received_slot,
id: 5,
},
&received_expected_res,
);
let (response, _) = robust_poll_or_panic(processed_recv3);
assert_eq!(expected, response);
// Subscription should be automatically removed after notification // Subscription should be automatically removed after notification
let sig_subs = subscriptions let sig_subs = subscriptions
.subscriptions .subscriptions
@ -1334,7 +1431,7 @@ pub(crate) mod tests {
// Unprocessed signature subscription should not be removed // Unprocessed signature subscription should not be removed
assert_eq!( assert_eq!(
sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(), sig_subs.get(&unprocessed_tx.signatures[0]).unwrap().len(),
1 2
); );
} }

View File

@ -8,6 +8,7 @@ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker}, cluster_info_vote_listener::{VerifiedVoteReceiver, VoteTracker},
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
ledger_cleanup_service::LedgerCleanupService, ledger_cleanup_service::LedgerCleanupService,
poh_recorder::PohRecorder, poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig}, replay_stage::{ReplayStage, ReplayStageConfig},
@ -100,6 +101,7 @@ impl Tvu {
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
completed_data_sets_sender: CompletedDataSetsSender,
tvu_config: TvuConfig, tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info.keypair.clone(); let keypair: Arc<Keypair> = cluster_info.keypair.clone();
@ -152,6 +154,7 @@ impl Tvu {
duplicate_slots_reset_sender, duplicate_slots_reset_sender,
verified_vote_receiver, verified_vote_receiver,
tvu_config.repair_validators, tvu_config.repair_validators,
completed_data_sets_sender,
); );
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
@ -249,6 +252,7 @@ pub mod tests {
}; };
use serial_test_derive::serial; use serial_test_derive::serial;
use solana_ledger::{ use solana_ledger::{
blockstore::BlockstoreSignals,
create_new_tmp_ledger, create_new_tmp_ledger,
genesis_utils::{create_genesis_config, GenesisConfigInfo}, genesis_utils::{create_genesis_config, GenesisConfigInfo},
}; };
@ -275,9 +279,13 @@ pub mod tests {
let cref1 = Arc::new(cluster_info1); let cref1 = Arc::new(cluster_info1);
let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config); let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
let (blockstore, l_receiver, completed_slots_receiver) = let BlockstoreSignals {
Blockstore::open_with_signal(&blockstore_path, None) blockstore,
.expect("Expected to successfully open ledger"); ledger_signal_receiver,
completed_slots_receiver,
..
} = Blockstore::open_with_signal(&blockstore_path, None)
.expect("Expected to successfully open ledger");
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
let (exit, poh_recorder, poh_service, _entry_receiver) = let (exit, poh_recorder, poh_service, _entry_receiver) =
@ -288,6 +296,7 @@ pub mod tests {
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded(); let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let (_verified_vote_sender, verified_vote_receiver) = unbounded(); let (_verified_vote_sender, verified_vote_receiver) = unbounded();
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks)); let bank_forks = Arc::new(RwLock::new(bank_forks));
let tvu = Tvu::new( let tvu = Tvu::new(
&vote_keypair.pubkey(), &vote_keypair.pubkey(),
@ -303,7 +312,7 @@ pub mod tests {
} }
}, },
blockstore, blockstore,
l_receiver, ledger_signal_receiver,
&Arc::new(RpcSubscriptions::new( &Arc::new(RpcSubscriptions::new(
&exit, &exit,
bank_forks.clone(), bank_forks.clone(),
@ -322,6 +331,7 @@ pub mod tests {
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,
replay_vote_sender, replay_vote_sender,
completed_data_sets_sender,
TvuConfig::default(), TvuConfig::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);

View File

@ -4,6 +4,7 @@ use crate::{
broadcast_stage::BroadcastStageType, broadcast_stage::BroadcastStageType,
cluster_info::{ClusterInfo, Node}, cluster_info::{ClusterInfo, Node},
cluster_info_vote_listener::VoteTracker, cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
contact_info::ContactInfo, contact_info::ContactInfo,
gossip_service::{discover_cluster, GossipService}, gossip_service::{discover_cluster, GossipService},
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
@ -21,12 +22,12 @@ use crate::{
transaction_status_service::TransactionStatusService, transaction_status_service::TransactionStatusService,
tvu::{Sockets, Tvu, TvuConfig}, tvu::{Sockets, Tvu, TvuConfig},
}; };
use crossbeam_channel::unbounded; use crossbeam_channel::{bounded, unbounded};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_banks_server::rpc_banks_service::RpcBanksService; use solana_banks_server::rpc_banks_service::RpcBanksService;
use solana_ledger::{ use solana_ledger::{
bank_forks_utils, bank_forks_utils,
blockstore::{Blockstore, CompletedSlotsReceiver, PurgeType}, blockstore::{Blockstore, BlockstoreSignals, CompletedSlotsReceiver, PurgeType},
blockstore_db::BlockstoreRecoveryMode, blockstore_db::BlockstoreRecoveryMode,
blockstore_processor::{self, TransactionStatusSender}, blockstore_processor::{self, TransactionStatusSender},
create_new_tmp_ledger, create_new_tmp_ledger,
@ -64,6 +65,8 @@ use std::{
time::Duration, time::Duration,
}; };
pub const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ValidatorConfig { pub struct ValidatorConfig {
pub dev_halt_at_slot: Option<Slot>, pub dev_halt_at_slot: Option<Slot>,
@ -156,6 +159,7 @@ pub struct Validator {
rewards_recorder_service: Option<RewardsRecorderService>, rewards_recorder_service: Option<RewardsRecorderService>,
gossip_service: GossipService, gossip_service: GossipService,
serve_repair_service: ServeRepairService, serve_repair_service: ServeRepairService,
completed_data_sets_service: CompletedDataSetsService,
snapshot_packager_service: Option<SnapshotPackagerService>, snapshot_packager_service: Option<SnapshotPackagerService>,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
poh_service: PohService, poh_service: PohService,
@ -284,6 +288,15 @@ impl Validator {
block_commitment_cache.clone(), block_commitment_cache.clone(),
)); ));
let (completed_data_sets_sender, completed_data_sets_receiver) =
bounded(MAX_COMPLETED_DATA_SETS_IN_CHANNEL);
let completed_data_sets_service = CompletedDataSetsService::new(
completed_data_sets_receiver,
blockstore.clone(),
subscriptions.clone(),
&exit,
);
let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let rpc_service = config let rpc_service = config
.rpc_ports .rpc_ports
@ -468,6 +481,7 @@ impl Validator {
retransmit_slots_sender, retransmit_slots_sender,
verified_vote_receiver, verified_vote_receiver,
replay_vote_sender.clone(), replay_vote_sender.clone(),
completed_data_sets_sender,
TvuConfig { TvuConfig {
max_ledger_shreds: config.max_ledger_shreds, max_ledger_shreds: config.max_ledger_shreds,
halt_on_trusted_validators_accounts_hash_mismatch: config halt_on_trusted_validators_accounts_hash_mismatch: config
@ -509,6 +523,7 @@ impl Validator {
transaction_status_service, transaction_status_service,
rewards_recorder_service, rewards_recorder_service,
snapshot_packager_service, snapshot_packager_service,
completed_data_sets_service,
tpu, tpu,
tvu, tvu,
poh_service, poh_service,
@ -579,6 +594,7 @@ impl Validator {
self.serve_repair_service.join()?; self.serve_repair_service.join()?;
self.tpu.join()?; self.tpu.join()?;
self.tvu.join()?; self.tvu.join()?;
self.completed_data_sets_service.join()?;
self.ip_echo_server.shutdown_now(); self.ip_echo_server.shutdown_now();
Ok(()) Ok(())
@ -622,9 +638,13 @@ fn new_banks_from_ledger(
} }
} }
let (mut blockstore, ledger_signal_receiver, completed_slots_receiver) = let BlockstoreSignals {
Blockstore::open_with_signal(ledger_path, config.wal_recovery_mode.clone()) mut blockstore,
.expect("Failed to open ledger database"); ledger_signal_receiver,
completed_slots_receiver,
..
} = Blockstore::open_with_signal(ledger_path, config.wal_recovery_mode.clone())
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction); blockstore.set_no_compaction(config.no_rocksdb_compaction);
let process_options = blockstore_processor::ProcessOptions { let process_options = blockstore_processor::ProcessOptions {

View File

@ -5,6 +5,7 @@ use crate::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
repair_response, repair_response,
repair_service::{RepairInfo, RepairService}, repair_service::{RepairInfo, RepairService},
result::{Error, Result}, result::{Error, Result},
@ -123,6 +124,7 @@ fn run_insert<F>(
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
handle_duplicate: F, handle_duplicate: F,
metrics: &mut BlockstoreInsertionMetrics, metrics: &mut BlockstoreInsertionMetrics,
completed_data_sets_sender: &CompletedDataSetsSender,
) -> Result<()> ) -> Result<()>
where where
F: Fn(Shred), F: Fn(Shred),
@ -138,13 +140,13 @@ where
let mut i = 0; let mut i = 0;
shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0); shreds.retain(|shred| (verify_repair(&shred, &repair_infos[i]), i += 1).0);
blockstore.insert_shreds_handle_duplicate( completed_data_sets_sender.try_send(blockstore.insert_shreds_handle_duplicate(
shreds, shreds,
Some(leader_schedule_cache), Some(leader_schedule_cache),
false, false,
&handle_duplicate, &handle_duplicate,
metrics, metrics,
)?; )?)?;
Ok(()) Ok(())
} }
@ -302,6 +304,7 @@ impl WindowService {
shred_filter: F, shred_filter: F,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: CompletedDataSetsSender,
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
@ -333,6 +336,7 @@ impl WindowService {
leader_schedule_cache, leader_schedule_cache,
insert_receiver, insert_receiver,
duplicate_sender, duplicate_sender,
completed_data_sets_sender,
); );
let t_window = Self::start_recv_window_thread( let t_window = Self::start_recv_window_thread(
@ -387,6 +391,7 @@ impl WindowService {
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>, insert_receiver: CrossbeamReceiver<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
duplicate_sender: CrossbeamSender<Shred>, duplicate_sender: CrossbeamSender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let exit = exit.clone(); let exit = exit.clone();
let blockstore = blockstore.clone(); let blockstore = blockstore.clone();
@ -415,6 +420,7 @@ impl WindowService {
&leader_schedule_cache, &leader_schedule_cache,
&handle_duplicate, &handle_duplicate,
&mut metrics, &mut metrics,
&completed_data_sets_sender,
) { ) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) { if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break; break;

View File

@ -285,8 +285,12 @@ fn test_rpc_subscriptions() {
let timeout = deadline.saturating_duration_since(Instant::now()); let timeout = deadline.saturating_duration_since(Instant::now());
match status_receiver.recv_timeout(timeout) { match status_receiver.recv_timeout(timeout) {
Ok((sig, result)) => { Ok((sig, result)) => {
assert!(result.value.err.is_none()); if let RpcSignatureResult::ProcessedSignatureResult(result) = result.value {
assert!(signature_set.remove(&sig)); assert!(result.err.is_none());
assert!(signature_set.remove(&sig));
} else {
panic!("Unexpected result");
}
} }
Err(_err) => { Err(_err) => {
assert!( assert!(

View File

@ -108,6 +108,19 @@ impl std::fmt::Display for InsertDataShredError {
} }
} }
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CompletedDataSetInfo {
pub slot: Slot,
pub start_index: u32,
pub end_index: u32,
}
pub struct BlockstoreSignals {
pub blockstore: Blockstore,
pub ledger_signal_receiver: Receiver<bool>,
pub completed_slots_receiver: CompletedSlotsReceiver,
}
// ledger window // ledger window
pub struct Blockstore { pub struct Blockstore {
db: Arc<Database>, db: Arc<Database>,
@ -339,16 +352,20 @@ impl Blockstore {
pub fn open_with_signal( pub fn open_with_signal(
ledger_path: &Path, ledger_path: &Path,
recovery_mode: Option<BlockstoreRecoveryMode>, recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<(Self, Receiver<bool>, CompletedSlotsReceiver)> { ) -> Result<BlockstoreSignals> {
let mut blockstore = let mut blockstore =
Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?; Self::open_with_access_type(ledger_path, AccessType::PrimaryOnly, recovery_mode)?;
let (signal_sender, signal_receiver) = sync_channel(1); let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) = let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL); sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.new_shreds_signals = vec![signal_sender]; blockstore.new_shreds_signals = vec![ledger_signal_sender];
blockstore.completed_slots_senders = vec![completed_slots_sender]; blockstore.completed_slots_senders = vec![completed_slots_sender];
Ok((blockstore, signal_receiver, completed_slots_receiver)) Ok(BlockstoreSignals {
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
})
} }
pub fn add_tree( pub fn add_tree(
@ -723,7 +740,7 @@ impl Blockstore {
is_trusted: bool, is_trusted: bool,
handle_duplicate: &F, handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics, metrics: &mut BlockstoreInsertionMetrics,
) -> Result<()> ) -> Result<Vec<CompletedDataSetInfo>>
where where
F: Fn(Shred), F: Fn(Shred),
{ {
@ -746,24 +763,30 @@ impl Blockstore {
let mut start = Measure::start("Shred insertion"); let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0; let mut num_inserted = 0;
let mut index_meta_time = 0; let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
shreds.into_iter().for_each(|shred| { shreds.into_iter().for_each(|shred| {
if shred.is_data() { if shred.is_data() {
if self let shred_slot = shred.slot();
.check_insert_data_shred( if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred, shred,
&mut erasure_metas, &mut erasure_metas,
&mut index_working_set, &mut index_working_set,
&mut slot_meta_working_set, &mut slot_meta_working_set,
&mut write_batch, &mut write_batch,
&mut just_inserted_data_shreds, &mut just_inserted_data_shreds,
&mut index_meta_time, &mut index_meta_time,
is_trusted, is_trusted,
handle_duplicate, handle_duplicate,
leader_schedule, leader_schedule,
false, false,
) ) {
.is_ok() newly_completed_data_sets.extend(completed_data_sets.into_iter().map(
{ |(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
));
num_inserted += 1; num_inserted += 1;
} }
} else if shred.is_code() { } else if shred.is_code() {
@ -800,6 +823,7 @@ impl Blockstore {
num_recovered = recovered_data.len(); num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| { recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) { if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
let shred_slot = shred.slot();
if shred.verify(&leader) { if shred.verify(&leader) {
match self.check_insert_data_shred( match self.check_insert_data_shred(
shred, shred,
@ -821,7 +845,16 @@ impl Blockstore {
num_recovered_failed_invalid += 1; num_recovered_failed_invalid += 1;
} }
Err(InsertDataShredError::BlockstoreError(_)) => {} Err(InsertDataShredError::BlockstoreError(_)) => {}
Ok(()) => { Ok(completed_data_sets) => {
newly_completed_data_sets.extend(
completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
),
);
num_recovered_inserted += 1; num_recovered_inserted += 1;
} }
} }
@ -902,7 +935,7 @@ impl Blockstore {
metrics.num_recovered_exists = num_recovered_exists; metrics.num_recovered_exists = num_recovered_exists;
metrics.index_meta_time += index_meta_time; metrics.index_meta_time += index_meta_time;
Ok(()) Ok(newly_completed_data_sets)
} }
pub fn clear_unconfirmed_slot(&self, slot: Slot) { pub fn clear_unconfirmed_slot(&self, slot: Slot) {
@ -934,7 +967,7 @@ impl Blockstore {
shreds: Vec<Shred>, shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool, is_trusted: bool,
) -> Result<()> { ) -> Result<Vec<CompletedDataSetInfo>> {
self.insert_shreds_handle_duplicate( self.insert_shreds_handle_duplicate(
shreds, shreds,
leader_schedule, leader_schedule,
@ -1044,7 +1077,7 @@ impl Blockstore {
handle_duplicate: &F, handle_duplicate: &F,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_recovered: bool, is_recovered: bool,
) -> std::result::Result<(), InsertDataShredError> ) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
where where
F: Fn(Shred), F: Fn(Shred),
{ {
@ -1076,7 +1109,8 @@ impl Blockstore {
} }
let set_index = u64::from(shred.common_header.fec_set_index); let set_index = u64::from(shred.common_header.fec_set_index);
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?; let newly_completed_data_sets =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?;
just_inserted_data_shreds.insert((slot, shred_index), shred); just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true; index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true;
@ -1089,7 +1123,7 @@ impl Blockstore {
erasure_metas.insert((slot, set_index), meta); erasure_metas.insert((slot, set_index), meta);
} }
} }
Ok(()) Ok(newly_completed_data_sets)
} }
fn should_insert_coding_shred( fn should_insert_coding_shred(
@ -1205,7 +1239,7 @@ impl Blockstore {
data_index: &mut ShredIndex, data_index: &mut ShredIndex,
shred: &Shred, shred: &Shred,
write_batch: &mut WriteBatch, write_batch: &mut WriteBatch,
) -> Result<()> { ) -> Result<Vec<(u32, u32)>> {
let slot = shred.slot(); let slot = shred.slot();
let index = u64::from(shred.index()); let index = u64::from(shred.index());
@ -1240,13 +1274,15 @@ impl Blockstore {
// Commit step: commit all changes to the mutable structures at once, or none at all. // Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through. // We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?; write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
update_slot_meta( data_index.set_present(index, true);
let newly_completed_data_sets = update_slot_meta(
last_in_slot, last_in_slot,
last_in_data, last_in_data,
slot_meta, slot_meta,
index as u32, index as u32,
new_consumed, new_consumed,
shred.reference_tick(), shred.reference_tick(),
&data_index,
); );
if slot_meta.is_full() { if slot_meta.is_full() {
info!( info!(
@ -1254,9 +1290,8 @@ impl Blockstore {
slot_meta.slot, slot_meta.last_index slot_meta.slot, slot_meta.last_index
); );
} }
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index); trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(()) Ok(newly_completed_data_sets)
} }
pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> { pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
@ -2310,7 +2345,12 @@ impl Blockstore {
completed_ranges completed_ranges
.par_iter() .par_iter()
.map(|(start_index, end_index)| { .map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, *start_index, *end_index, &slot_meta) self.get_entries_in_data_block(
slot,
*start_index,
*end_index,
Some(&slot_meta),
)
}) })
.collect() .collect()
}) })
@ -2375,12 +2415,12 @@ impl Blockstore {
completed_data_ranges completed_data_ranges
} }
fn get_entries_in_data_block( pub fn get_entries_in_data_block(
&self, &self,
slot: Slot, slot: Slot,
start_index: u32, start_index: u32,
end_index: u32, end_index: u32,
slot_meta: &SlotMeta, slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> { ) -> Result<Vec<Entry>> {
let data_shred_cf = self.db.column::<cf::ShredData>(); let data_shred_cf = self.db.column::<cf::ShredData>();
@ -2390,23 +2430,33 @@ impl Blockstore {
data_shred_cf data_shred_cf
.get_bytes((slot, u64::from(i))) .get_bytes((slot, u64::from(i)))
.and_then(|serialized_shred| { .and_then(|serialized_shred| {
Shred::new_from_serialized_shred(serialized_shred.unwrap_or_else(|| { if serialized_shred.is_none() {
panic!( if let Some(slot_meta) = slot_meta {
"Shred with panic!(
slot: {}, "Shred with
index: {}, slot: {},
consumed: {}, index: {},
completed_indexes: {:?} consumed: {},
must exist if shred index was included in a range: {} {}", completed_indexes: {:?}
slot, must exist if shred index was included in a range: {} {}",
i, slot,
slot_meta.consumed, i,
slot_meta.completed_data_indexes, slot_meta.consumed,
start_index, slot_meta.completed_data_indexes,
end_index start_index,
) end_index
})) );
.map_err(|err| { } else {
return Err(BlockstoreError::InvalidShredData(Box::new(
bincode::ErrorKind::Custom(format!(
"Missing shred for slot {}, index {}",
slot, i
)),
)));
}
}
Shred::new_from_serialized_shred(serialized_shred.unwrap()).map_err(|err| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom( BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!( format!(
"Could not reconstruct shred from shred payload: {:?}", "Could not reconstruct shred from shred payload: {:?}",
@ -2450,8 +2500,13 @@ impl Blockstore {
completed_ranges completed_ranges
.par_iter() .par_iter()
.map(|(start_index, end_index)| { .map(|(start_index, end_index)| {
self.get_entries_in_data_block(slot, *start_index, *end_index, &slot_meta) self.get_entries_in_data_block(
.unwrap_or_default() slot,
*start_index,
*end_index,
Some(&slot_meta),
)
.unwrap_or_default()
}) })
.collect() .collect()
}) })
@ -2601,6 +2656,89 @@ impl Blockstore {
} }
} }
// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
// data set is complete, return the range of shred indexes [start_index, end_index]
// for that completed data set.
fn update_completed_data_indexes(
is_last_in_data: bool,
new_shred_index: u32,
received_data_shreds: &ShredIndex,
// Sorted array of shred indexes marked data complete
completed_data_indexes: &mut Vec<u32>,
) -> Vec<(u32, u32)> {
let mut first_greater_pos = None;
let mut prev_completed_shred_index = None;
// Find the first item in `completed_data_indexes > new_shred_index`
for (i, completed_data_index) in completed_data_indexes.iter().enumerate() {
// `completed_data_indexes` should be sorted from smallest to largest
assert!(
prev_completed_shred_index.is_none()
|| *completed_data_index > prev_completed_shred_index.unwrap()
);
if *completed_data_index > new_shred_index {
first_greater_pos = Some(i);
break;
}
prev_completed_shred_index = Some(*completed_data_index);
}
// Consecutive entries i, k, j in this vector represent potential ranges [i, k),
// [k, j) that could be completed data ranges
let mut check_ranges: Vec<u32> = vec![prev_completed_shred_index
.map(|completed_data_shred_index| completed_data_shred_index + 1)
.unwrap_or(0)];
let mut first_greater_data_complete_index =
first_greater_pos.map(|i| completed_data_indexes[i]);
// `new_shred_index` is data complete, so need to insert here into the
// `completed_data_indexes`
if is_last_in_data {
if first_greater_pos.is_some() {
// If there exists a data complete shred greater than `new_shred_index`,
// and the new shred is marked data complete, then the range
// [new_shred_index + 1, completed_data_indexes[pos]] may be complete,
// so add that range to check
check_ranges.push(new_shred_index + 1);
}
completed_data_indexes.insert(
first_greater_pos.unwrap_or_else(|| {
// If `first_greater_pos` is none, then there was no greater
// data complete index so mark this new shred's index as the latest data
// complete index
first_greater_data_complete_index = Some(new_shred_index);
completed_data_indexes.len()
}),
new_shred_index,
);
}
if first_greater_data_complete_index.is_none() {
// That means new_shred_index > all known completed data indexes and
// new shred not data complete, which means the data set of that new
// shred is not data complete
return vec![];
}
check_ranges.push(first_greater_data_complete_index.unwrap() + 1);
let mut completed_data_ranges = vec![];
for range in check_ranges.windows(2) {
let mut is_complete = true;
for shred_index in range[0]..range[1] {
// If we're missing any shreds, the data set cannot be confirmed
// to be completed, so check the next range
if !received_data_shreds.is_present(shred_index as u64) {
is_complete = false;
break;
}
}
if is_complete {
completed_data_ranges.push((range[0], range[1] - 1));
}
}
completed_data_ranges
}
fn update_slot_meta( fn update_slot_meta(
is_last_in_slot: bool, is_last_in_slot: bool,
is_last_in_data: bool, is_last_in_data: bool,
@ -2608,7 +2746,8 @@ fn update_slot_meta(
index: u32, index: u32,
new_consumed: u64, new_consumed: u64,
reference_tick: u8, reference_tick: u8,
) { received_data_shreds: &ShredIndex,
) -> Vec<(u32, u32)> {
let maybe_first_insert = slot_meta.received == 0; let maybe_first_insert = slot_meta.received == 0;
// Index is zero-indexed, while the "received" height starts from 1, // Index is zero-indexed, while the "received" height starts from 1,
// so received = index + 1 for the same shred. // so received = index + 1 for the same shred.
@ -2633,15 +2772,12 @@ fn update_slot_meta(
} }
}; };
if is_last_in_slot || is_last_in_data { update_completed_data_indexes(
let position = slot_meta is_last_in_slot || is_last_in_data,
.completed_data_indexes index,
.iter() received_data_shreds,
.position(|completed_data_index| *completed_data_index > index) &mut slot_meta.completed_data_indexes,
.unwrap_or_else(|| slot_meta.completed_data_indexes.len()); )
slot_meta.completed_data_indexes.insert(position, index);
}
} }
fn get_index_meta_entry<'a>( fn get_index_meta_entry<'a>(
@ -3973,11 +4109,49 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_data_set_completed_on_insert() {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals { blockstore, .. } =
Blockstore::open_with_signal(&ledger_path, None).unwrap();
// Create enough entries to fill 2 shreds, only the later one is data complete
let slot = 0;
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
let entries = create_ticks(num_entries, slot, Hash::default());
let shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
let num_shreds = shreds.len();
assert!(num_shreds > 1);
assert!(blockstore
.insert_shreds(shreds[1..].to_vec(), None, false)
.unwrap()
.is_empty());
assert_eq!(
blockstore
.insert_shreds(vec![shreds[0].clone()], None, false)
.unwrap(),
vec![CompletedDataSetInfo {
slot,
start_index: 0,
end_index: num_shreds as u32 - 1
}]
);
// Inserting shreds again doesn't trigger notification
assert!(blockstore
.insert_shreds(shreds, None, false)
.unwrap()
.is_empty());
}
#[test] #[test]
pub fn test_new_shreds_signal() { pub fn test_new_shreds_signal() {
// Initialize ledger // Initialize ledger
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let (ledger, recvr, _) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let BlockstoreSignals {
blockstore: ledger,
ledger_signal_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
let entries_per_slot = 50; let entries_per_slot = 50;
@ -4057,7 +4231,11 @@ pub mod tests {
pub fn test_completed_shreds_signal() { pub fn test_completed_shreds_signal() {
// Initialize ledger // Initialize ledger
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
let entries_per_slot = 10; let entries_per_slot = 10;
@ -4079,7 +4257,11 @@ pub mod tests {
pub fn test_completed_shreds_signal_orphans() { pub fn test_completed_shreds_signal_orphans() {
// Initialize ledger // Initialize ledger
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
let entries_per_slot = 10; let entries_per_slot = 10;
@ -4119,7 +4301,11 @@ pub mod tests {
pub fn test_completed_shreds_signal_many() { pub fn test_completed_shreds_signal_many() {
// Initialize ledger // Initialize ledger
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
let (ledger, _, recvr) = Blockstore::open_with_signal(&ledger_path, None).unwrap(); let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None).unwrap();
let ledger = Arc::new(ledger); let ledger = Arc::new(ledger);
let entries_per_slot = 10; let entries_per_slot = 10;
@ -7049,4 +7235,64 @@ pub mod tests {
} }
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
fn test_update_completed_data_indexes() {
let mut completed_data_indexes: Vec<u32> = vec![];
let mut shred_index = ShredIndex::default();
for i in 0..10 {
shred_index.set_present(i as u64, true);
assert_eq!(
update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
vec![(i, i)]
);
assert_eq!(completed_data_indexes, (0..=i).collect::<Vec<u32>>());
}
}
#[test]
fn test_update_completed_data_indexes_out_of_order() {
let mut completed_data_indexes = vec![];
let mut shred_index = ShredIndex::default();
shred_index.set_present(4, true);
assert!(
update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert!(completed_data_indexes.is_empty());
shred_index.set_present(2, true);
assert!(
update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert!(completed_data_indexes.is_empty());
shred_index.set_present(3, true);
assert!(
update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert_eq!(completed_data_indexes, vec![3]);
// Inserting data complete shred 1 now confirms the range of shreds [2, 3]
// is part of the same data set
shred_index.set_present(1, true);
assert_eq!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
vec![(2, 3)]
);
assert_eq!(completed_data_indexes, vec![1, 3]);
// Inserting data complete shred 0 now confirms the range of shreds [0]
// is part of the same data set
shred_index.set_present(0, true);
assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)]
);
assert_eq!(completed_data_indexes, vec![0, 1, 3]);
}
} }

View File

@ -2,10 +2,14 @@ use assert_matches::assert_matches;
use gag::BufferRedirect; use gag::BufferRedirect;
use log::*; use log::*;
use serial_test_derive::serial; use serial_test_derive::serial;
use solana_client::rpc_client::RpcClient; use solana_client::{
use solana_client::thin_client::create_client; pubsub_client::{PubsubClient, SignatureResult},
rpc_client::RpcClient,
thin_client::create_client,
};
use solana_core::{ use solana_core::{
broadcast_stage::BroadcastStageType, broadcast_stage::BroadcastStageType,
cluster_info::VALIDATOR_PORT_RANGE,
consensus::{SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, consensus::{SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH},
gossip_service::discover_cluster, gossip_service::discover_cluster,
validator::ValidatorConfig, validator::ValidatorConfig,
@ -34,6 +38,7 @@ use solana_sdk::{
poh_config::PohConfig, poh_config::PohConfig,
pubkey::Pubkey, pubkey::Pubkey,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
system_transaction,
}; };
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::{ use std::{
@ -132,6 +137,71 @@ fn test_spend_and_verify_all_nodes_3() {
); );
} }
#[test]
#[serial]
fn test_local_cluster_signature_subscribe() {
solana_logger::setup();
let num_nodes = 2;
let cluster = LocalCluster::new_with_equal_stakes(num_nodes, 10_000, 100);
let nodes = cluster.get_node_pubkeys();
// Get non leader
let non_bootstrap_id = nodes
.into_iter()
.find(|id| *id != cluster.entry_point_info.id)
.unwrap();
let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap();
let tx_client = create_client(
non_bootstrap_info.client_facing_addr(),
VALIDATOR_PORT_RANGE,
);
let (blockhash, _fee_calculator, _last_valid_slot) = tx_client
.get_recent_blockhash_with_commitment(CommitmentConfig::recent())
.unwrap();
let mut transaction =
system_transaction::transfer(&cluster.funding_keypair, &Pubkey::new_rand(), 10, blockhash);
let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe(
&format!("ws://{}", &non_bootstrap_info.rpc_pubsub.to_string()),
&transaction.signatures[0],
)
.unwrap();
tx_client
.retry_transfer(&cluster.funding_keypair, &mut transaction, 5)
.unwrap();
let mut got_received_notification = false;
loop {
let responses: Vec<_> = receiver.try_iter().collect();
let mut should_break = false;
for response in responses {
match response.value {
SignatureResult::ProcessedSignatureResult(_) => {
should_break = true;
break;
}
SignatureResult::ReceivedSignature => {
got_received_notification = true;
}
}
}
if should_break {
break;
}
sleep(Duration::from_millis(100));
}
// If we don't drop the cluster, the blocking web socket service
// won't return, and the `sig_subscribe_client` won't shut down
drop(cluster);
sig_subscribe_client.shutdown().unwrap();
assert!(got_received_notification);
}
#[test] #[test]
#[allow(unused_attributes)] #[allow(unused_attributes)]
#[ignore] #[ignore]