diff --git a/clap-utils/src/input_parsers.rs b/clap-utils/src/input_parsers.rs index 3ff4092607..ce431e075d 100644 --- a/clap-utils/src/input_parsers.rs +++ b/clap-utils/src/input_parsers.rs @@ -189,6 +189,7 @@ pub fn commitment_of(matches: &ArgMatches<'_>, name: &str) -> Option CommitmentConfig::recent(), "root" => CommitmentConfig::root(), "single" => CommitmentConfig::single(), + "singleGossip" => CommitmentConfig::single_gossip(), _ => CommitmentConfig::default(), }) } diff --git a/cli/src/cli.rs b/cli/src/cli.rs index c42a16c283..91c96f937a 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -31,7 +31,7 @@ use solana_client::{ client_error::{ClientError, ClientErrorKind, Result as ClientResult}, nonce_utils, rpc_client::RpcClient, - rpc_config::{RpcLargestAccountsFilter, RpcSendTransactionConfig}, + rpc_config::{RpcLargestAccountsFilter, RpcSendTransactionConfig, RpcTransactionLogsFilter}, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, rpc_response::{RpcKeyedAccount, RpcLeaderSchedule}, }; @@ -120,6 +120,9 @@ pub enum CliCommand { }, LeaderSchedule, LiveSlots, + Logs { + filter: RpcTransactionLogsFilter, + }, Ping { lamports: u64, interval: Duration, @@ -585,6 +588,7 @@ pub fn parse_command( command: CliCommand::LiveSlots, signers: vec![], }), + ("logs", Some(matches)) => parse_logs(matches, wallet_manager), ("block-production", Some(matches)) => parse_show_block_production(matches), ("gossip", Some(_matches)) => Ok(CliCommandInfo { command: CliCommand::ShowGossip, @@ -1559,7 +1563,8 @@ pub fn process_command(config: &CliConfig) -> ProcessResult { process_inflation_subcommand(&rpc_client, config, inflation_subcommand) } CliCommand::LeaderSchedule => process_leader_schedule(&rpc_client), - CliCommand::LiveSlots => process_live_slots(&config.websocket_url), + CliCommand::LiveSlots => process_live_slots(&config), + CliCommand::Logs { filter } => process_logs(&config, filter), CliCommand::Ping { lamports, interval, diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 3237c8b50f..95e7153c40 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -7,7 +7,10 @@ use chrono::{Local, TimeZone}; use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; use console::{style, Emoji}; use solana_clap_utils::{ - commitment::commitment_arg, input_parsers::*, input_validators::*, keypair::DefaultSigner, + commitment::{commitment_arg, commitment_arg_with_default}, + input_parsers::*, + input_validators::*, + keypair::DefaultSigner, }; use solana_cli_output::{ display::{ @@ -21,7 +24,7 @@ use solana_client::{ rpc_client::{GetConfirmedSignaturesForAddress2Config, RpcClient}, rpc_config::{ RpcAccountInfoConfig, RpcLargestAccountsConfig, RpcLargestAccountsFilter, - RpcProgramAccountsConfig, + RpcProgramAccountsConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter, }, rpc_filter, rpc_response::SlotInfo, @@ -233,6 +236,26 @@ impl ClusterQuerySubCommands for App<'_, '_> { SubCommand::with_name("live-slots") .about("Show information about the current slot progression"), ) + .subcommand( + SubCommand::with_name("logs") + .about("Stream transaction logs") + .arg( + pubkey!(Arg::with_name("address") + .index(1) + .value_name("ADDRESS"), + "Account address to monitor \ + [default: monitor all transactions except for votes] \ + ") + ) + .arg( + Arg::with_name("include_votes") + .long("include-votes") + .takes_value(false) + .conflicts_with("address") + .help("Include vote transactions when monitoring all transactions") + ) + .arg(commitment_arg_with_default("singleGossip")), + ) .subcommand( SubCommand::with_name("block-production") .about("Show information about block production") @@ -1172,24 +1195,83 @@ pub fn process_ping( Ok("".to_string()) } -pub fn process_live_slots(url: &str) -> ProcessResult { - let exit = Arc::new(AtomicBool::new(false)); +pub fn parse_logs( + matches: &ArgMatches<'_>, + wallet_manager: &mut Option>, +) -> Result { + let address = pubkey_of_signer(matches, "address", wallet_manager)?; + let include_votes = matches.is_present("include_votes"); - // Disable Ctrl+C handler as sometimes the PubsubClient shutdown can stall. Also it doesn't - // really matter that the shutdown is clean because the process is terminating. - /* - let exit_clone = exit.clone(); - ctrlc::set_handler(move || { - exit_clone.store(true, Ordering::Relaxed); - })?; - */ + let filter = match address { + None => { + if include_votes { + RpcTransactionLogsFilter::AllWithVotes + } else { + RpcTransactionLogsFilter::All + } + } + Some(address) => RpcTransactionLogsFilter::Mentions(vec![address.to_string()]), + }; + + Ok(CliCommandInfo { + command: CliCommand::Logs { filter }, + signers: vec![], + }) +} + +pub fn process_logs(config: &CliConfig, filter: &RpcTransactionLogsFilter) -> ProcessResult { + println!( + "Streaming transaction logs{}. {:?} commitment", + match filter { + RpcTransactionLogsFilter::All => "".into(), + RpcTransactionLogsFilter::AllWithVotes => " (including votes)".into(), + RpcTransactionLogsFilter::Mentions(addresses) => + format!(" mentioning {}", addresses.join(",")), + }, + config.commitment.commitment + ); + + let (_client, receiver) = PubsubClient::logs_subscribe( + &config.websocket_url, + filter.clone(), + RpcTransactionLogsConfig { + commitment: Some(config.commitment), + }, + )?; + + loop { + match receiver.recv() { + Ok(logs) => { + println!("Transaction executed in slot {}:", logs.context.slot); + println!(" Signature: {}", logs.value.signature); + println!( + " Status: {}", + logs.value + .err + .map(|err| err.to_string()) + .unwrap_or_else(|| "Ok".to_string()) + ); + println!(" Log Messages:"); + for log in logs.value.logs { + println!(" {}", log); + } + } + Err(err) => { + return Ok(format!("Disconnected: {}", err)); + } + } + } +} + +pub fn process_live_slots(config: &CliConfig) -> ProcessResult { + let exit = Arc::new(AtomicBool::new(false)); let mut current: Option = None; let mut message = "".to_string(); let slot_progress = new_spinner_progress_bar(); slot_progress.set_message("Connecting..."); - let (mut client, receiver) = PubsubClient::slot_subscribe(url)?; + let (mut client, receiver) = PubsubClient::slot_subscribe(&config.websocket_url)?; slot_progress.set_message("Connected."); let spacer = "|"; diff --git a/client/src/pubsub_client.rs b/client/src/pubsub_client.rs index 107d3fc0d3..d88a56d646 100644 --- a/client/src/pubsub_client.rs +++ b/client/src/pubsub_client.rs @@ -1,6 +1,6 @@ use crate::{ - rpc_config::RpcSignatureSubscribeConfig, - rpc_response::{Response as RpcResponse, RpcSignatureResult, SlotInfo}, + rpc_config::{RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter}, + rpc_response::{Response as RpcResponse, RpcLogsResponse, RpcSignatureResult, SlotInfo}, }; use log::*; use serde::de::DeserializeOwned; @@ -23,8 +23,6 @@ use thiserror::Error; use tungstenite::{client::AutoStream, connect, Message, WebSocket}; use url::{ParseError, Url}; -type PubsubSignatureResponse = PubsubClientSubscription>; - #[derive(Debug, Error)] pub enum PubsubClientError { #[error("url parse error")] @@ -36,8 +34,8 @@ pub enum PubsubClientError { #[error("json parse error")] JsonParseError(#[from] serde_json::error::Error), - #[error("unexpected message format")] - UnexpectedMessageError, + #[error("unexpected message format: {0}")] + UnexpectedMessageError(String), } pub struct PubsubClientSubscription @@ -92,8 +90,11 @@ where return Ok(x); } } - - Err(PubsubClientError::UnexpectedMessageError) + // TODO: Add proper JSON RPC response/error handling... + Err(PubsubClientError::UnexpectedMessageError(format!( + "{:?}", + json_msg + ))) } pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> { @@ -117,14 +118,18 @@ where let message_text = &message.into_text().unwrap(); let json_msg: Map = serde_json::from_str(message_text)?; - if let Some(Object(value_1)) = json_msg.get("params") { - if let Some(value_2) = value_1.get("result") { - let x: T = serde_json::from_value::(value_2.clone()).unwrap(); + if let Some(Object(params)) = json_msg.get("params") { + if let Some(result) = params.get("result") { + let x: T = serde_json::from_value::(result.clone()).unwrap(); return Ok(x); } } - Err(PubsubClientError::UnexpectedMessageError) + // TODO: Add proper JSON RPC response/error handling... + Err(PubsubClientError::UnexpectedMessageError(format!( + "{:?}", + json_msg + ))) } pub fn shutdown(&mut self) -> std::thread::Result<()> { @@ -141,15 +146,79 @@ where } } -const SLOT_OPERATION: &str = "slot"; -const SIGNATURE_OPERATION: &str = "signature"; +pub type LogsSubscription = ( + PubsubClientSubscription>, + Receiver>, +); +pub type SlotsSubscription = (PubsubClientSubscription, Receiver); +pub type SignatureSubscription = ( + PubsubClientSubscription>, + Receiver>, +); pub struct PubsubClient {} impl PubsubClient { - pub fn slot_subscribe( + pub fn logs_subscribe( url: &str, - ) -> Result<(PubsubClientSubscription, Receiver), PubsubClientError> { + filter: RpcTransactionLogsFilter, + config: RpcTransactionLogsConfig, + ) -> Result { + let url = Url::parse(url)?; + let (socket, _response) = connect(url)?; + let (sender, receiver) = channel(); + + let socket = Arc::new(RwLock::new(socket)); + let socket_clone = socket.clone(); + let exit = Arc::new(AtomicBool::new(false)); + let exit_clone = exit.clone(); + + let subscription_id = + PubsubClientSubscription::>::send_subscribe( + &socket_clone, + json!({ + "jsonrpc":"2.0","id":1,"method":"logsSubscribe","params":[filter, config] + }) + .to_string(), + )?; + + let t_cleanup = std::thread::spawn(move || { + loop { + if exit_clone.load(Ordering::Relaxed) { + break; + } + + match PubsubClientSubscription::read_message(&socket_clone) { + Ok(message) => match sender.send(message) { + Ok(_) => (), + Err(err) => { + info!("receive error: {:?}", err); + break; + } + }, + Err(err) => { + info!("receive error: {:?}", err); + break; + } + } + } + + info!("websocket - exited receive loop"); + }); + + let result = PubsubClientSubscription { + message_type: PhantomData, + operation: "logs", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; + + Ok((result, receiver)) + } + + pub fn slot_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let (socket, _response) = connect(url)?; let (sender, receiver) = channel::(); @@ -161,41 +230,37 @@ impl PubsubClient { let subscription_id = PubsubClientSubscription::::send_subscribe( &socket_clone, json!({ - "jsonrpc":"2.0","id":1,"method":format!("{}Subscribe", SLOT_OPERATION),"params":[] + "jsonrpc":"2.0","id":1,"method":"slotSubscribe","params":[] }) .to_string(), - ) - .unwrap(); + )?; let t_cleanup = std::thread::spawn(move || { loop { if exit_clone.load(Ordering::Relaxed) { break; } - - let message: Result = - PubsubClientSubscription::read_message(&socket_clone); - - if let Ok(msg) = message { - match sender.send(msg) { + match PubsubClientSubscription::read_message(&socket_clone) { + Ok(message) => match sender.send(message) { Ok(_) => (), Err(err) => { info!("receive error: {:?}", err); break; } + }, + Err(err) => { + info!("receive error: {:?}", err); + break; } - } else { - info!("receive error: {:?}", message); - break; } } info!("websocket - exited receive loop"); }); - let result: PubsubClientSubscription = PubsubClientSubscription { + let result = PubsubClientSubscription { message_type: PhantomData, - operation: SLOT_OPERATION, + operation: "slot", socket, subscription_id, t_cleanup: Some(t_cleanup), @@ -209,16 +274,10 @@ impl PubsubClient { url: &str, signature: &Signature, config: Option, - ) -> Result< - ( - PubsubSignatureResponse, - Receiver>, - ), - PubsubClientError, - > { + ) -> Result { let url = Url::parse(url)?; let (socket, _response) = connect(url)?; - let (sender, receiver) = channel::>(); + let (sender, receiver) = channel(); let socket = Arc::new(RwLock::new(socket)); let socket_clone = socket.clone(); @@ -227,7 +286,7 @@ impl PubsubClient { let body = json!({ "jsonrpc":"2.0", "id":1, - "method":format!("{}Subscribe", SIGNATURE_OPERATION), + "method":"signatureSubscribe", "params":[ signature.to_string(), config @@ -238,8 +297,7 @@ impl PubsubClient { PubsubClientSubscription::>::send_subscribe( &socket_clone, body, - ) - .unwrap(); + )?; let t_cleanup = std::thread::spawn(move || { loop { @@ -267,15 +325,14 @@ impl PubsubClient { info!("websocket - exited receive loop"); }); - let result: PubsubClientSubscription> = - PubsubClientSubscription { - message_type: PhantomData, - operation: SIGNATURE_OPERATION, - socket, - subscription_id, - t_cleanup: Some(t_cleanup), - exit, - }; + let result = PubsubClientSubscription { + message_type: PhantomData, + operation: "signature", + socket, + subscription_id, + t_cleanup: Some(t_cleanup), + exit, + }; Ok((result, receiver)) } diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index 671da02b38..685f1eddb6 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -71,6 +71,20 @@ pub struct RpcProgramAccountsConfig { pub account_config: RpcAccountInfoConfig, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RpcTransactionLogsFilter { + All, + AllWithVotes, + Mentions(Vec), // base58-encoded list of addresses +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcTransactionLogsConfig { + pub commitment: Option, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum RpcTokenAccountsFilter { diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index 94ef19c449..1dc8ec8994 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -108,6 +108,14 @@ pub enum RpcSignatureResult { ReceivedSignature(ReceivedSignatureResult), } +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RpcLogsResponse { + pub signature: String, // Signature as base58 string + pub err: Option, + pub logs: Vec, +} + #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct ProcessedSignatureResult { diff --git a/core/src/rpc_pubsub.rs b/core/src/rpc_pubsub.rs index 51f15f4ae7..c27f390e41 100644 --- a/core/src/rpc_pubsub.rs +++ b/core/src/rpc_pubsub.rs @@ -6,8 +6,13 @@ use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, Session, SubscriptionId}; use solana_account_decoder::UiAccount; use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig}, - rpc_response::{Response as RpcResponse, RpcKeyedAccount, RpcSignatureResult, SlotInfo}, + rpc_config::{ + RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, + RpcTransactionLogsConfig, RpcTransactionLogsFilter, + }, + rpc_response::{ + Response as RpcResponse, RpcKeyedAccount, RpcLogsResponse, RpcSignatureResult, SlotInfo, + }, }; #[cfg(test)] use solana_runtime::bank_forks::BankForks; @@ -75,6 +80,24 @@ pub trait RpcSolPubSub { fn program_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get logs for all transactions that reference the specified address + #[pubsub(subscription = "logsNotification", subscribe, name = "logsSubscribe")] + fn logs_subscribe( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + filter: RpcTransactionLogsFilter, + config: RpcTransactionLogsConfig, + ); + + // Unsubscribe from logs notification subscription. + #[pubsub( + subscription = "logsNotification", + unsubscribe, + name = "logsUnsubscribe" + )] + fn logs_unsubscribe(&self, meta: Option, id: SubscriptionId) -> Result; + // Get notification when signature is verified // Accepts signature parameter as base-58 encoded string #[pubsub( @@ -241,6 +264,67 @@ impl RpcSolPubSub for RpcSolPubSubImpl { } } + fn logs_subscribe( + &self, + _meta: Self::Metadata, + subscriber: Subscriber>, + filter: RpcTransactionLogsFilter, + config: RpcTransactionLogsConfig, + ) { + info!("logs_subscribe"); + + let (address, include_votes) = match filter { + RpcTransactionLogsFilter::All => (None, false), + RpcTransactionLogsFilter::AllWithVotes => (None, true), + RpcTransactionLogsFilter::Mentions(addresses) => { + match addresses.len() { + 1 => match param::(&addresses[0], "mentions") { + Ok(address) => (Some(address), false), + Err(e) => { + subscriber.reject(e).unwrap(); + return; + } + }, + _ => { + // Room is reserved in the API to support multiple addresses, but for now + // the implementation only supports one + subscriber + .reject(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Only 1 address supported".into(), + data: None, + }) + .unwrap(); + return; + } + } + } + }; + + let id = self.uid.fetch_add(1, atomic::Ordering::Relaxed); + let sub_id = SubscriptionId::Number(id as u64); + self.subscriptions.add_logs_subscription( + address, + include_votes, + config.commitment, + sub_id, + subscriber, + ) + } + + fn logs_unsubscribe(&self, _meta: Option, id: SubscriptionId) -> Result { + info!("logs_unsubscribe: id={:?}", id); + if self.subscriptions.remove_logs_subscription(&id) { + Ok(true) + } else { + Err(Error { + code: ErrorCode::InvalidParams, + message: "Invalid Request: Subscription id does not exist".into(), + data: None, + }) + } + } + fn signature_subscribe( &self, _meta: Self::Metadata, diff --git a/core/src/rpc_subscriptions.rs b/core/src/rpc_subscriptions.rs index e288433e75..ed6c29c2ce 100644 --- a/core/src/rpc_subscriptions.rs +++ b/core/src/rpc_subscriptions.rs @@ -17,12 +17,14 @@ use solana_client::{ rpc_filter::RpcFilterType, rpc_response::{ ProcessedSignatureResult, ReceivedSignatureResult, Response, RpcKeyedAccount, - RpcResponseContext, RpcSignatureResult, SlotInfo, + RpcLogsResponse, RpcResponseContext, RpcSignatureResult, SlotInfo, }, }; use solana_measure::measure::Measure; use solana_runtime::{ - bank::Bank, + bank::{ + Bank, TransactionLogCollectorConfig, TransactionLogCollectorFilter, TransactionLogInfo, + }, bank_forks::BankForks, commitment::{BlockCommitmentCache, CommitmentSlots}, }; @@ -35,16 +37,16 @@ use solana_sdk::{ transaction, }; use solana_vote_program::vote_state::Vote; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, -}; -use std::thread::{Builder, JoinHandle}; -use std::time::Duration; use std::{ collections::{HashMap, HashSet}, iter, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError, SendError, Sender}, + }, sync::{Arc, Mutex, RwLock}, + thread::{Builder, JoinHandle}, + time::Duration, }; // Stuck on tokio 0.1 until the jsonrpc-pubsub crate upgrades to tokio 0.2 @@ -52,6 +54,28 @@ use tokio_01::runtime::{Builder as RuntimeBuilder, Runtime, TaskExecutor}; const RECEIVE_DELAY_MILLIS: u64 = 100; +trait BankGetTransactionLogsAdapter { + fn get_transaction_logs_adapter( + &self, + stuff: &(Option, bool), + ) -> Option>; +} + +impl BankGetTransactionLogsAdapter for Bank { + fn get_transaction_logs_adapter( + &self, + config: &(Option, bool), + ) -> Option> { + let mut logs = self.get_transaction_logs(config.0.as_ref()); + + if config.0.is_none() && !config.1 { + // Filter out votes if the subscriber doesn't want them + logs = logs.map(|logs| logs.into_iter().filter(|log| !log.is_vote).collect()); + } + logs + } +} + // A more human-friendly version of Vote, with the bank state signature base58 encoded. #[derive(Serialize, Deserialize, Debug)] pub struct RpcVote { @@ -103,6 +127,12 @@ type RpcAccountSubscriptions = RwLock< HashMap, UiAccountEncoding>>, >, >; +type RpcLogsSubscriptions = RwLock< + HashMap< + (Option, bool), + HashMap, ()>>, + >, +>; type RpcProgramSubscriptions = RwLock< HashMap< Pubkey, @@ -182,7 +212,7 @@ where S: Clone + Serialize, B: Fn(&Bank, &K) -> X, F: Fn(X, &K, Slot, Option, Arc) -> (Box>, Slot), - X: Clone + Serialize + Default, + X: Clone + Default, T: Clone, { let mut notified_set: HashSet = HashSet::new(); @@ -321,12 +351,34 @@ fn filter_program_results( (accounts, last_notified_slot) } +fn filter_logs_results( + logs: Option>, + _address: &(Option, bool), + last_notified_slot: Slot, + _config: Option<()>, + _bank: Arc, +) -> (Box>, Slot) { + match logs { + None => (Box::new(iter::empty()), last_notified_slot), + Some(logs) => ( + Box::new(logs.into_iter().map(|log| RpcLogsResponse { + signature: log.signature.to_string(), + err: log.result.err(), + logs: log.log_messages, + })), + last_notified_slot, + ), + } +} + #[derive(Clone)] struct Subscriptions { account_subscriptions: Arc, program_subscriptions: Arc, + logs_subscriptions: Arc, signature_subscriptions: Arc, gossip_account_subscriptions: Arc, + gossip_logs_subscriptions: Arc, gossip_program_subscriptions: Arc, gossip_signature_subscriptions: Arc, slot_subscriptions: Arc, @@ -383,9 +435,11 @@ impl RpcSubscriptions { ) = std::sync::mpsc::channel(); let account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); + let logs_subscriptions = Arc::new(RpcLogsSubscriptions::default()); let program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let gossip_account_subscriptions = Arc::new(RpcAccountSubscriptions::default()); + let gossip_logs_subscriptions = Arc::new(RpcLogsSubscriptions::default()); let gossip_program_subscriptions = Arc::new(RpcProgramSubscriptions::default()); let gossip_signature_subscriptions = Arc::new(RpcSignatureSubscriptions::default()); let slot_subscriptions = Arc::new(RpcSlotSubscriptions::default()); @@ -398,9 +452,11 @@ impl RpcSubscriptions { let exit_clone = exit.clone(); let subscriptions = Subscriptions { account_subscriptions, + logs_subscriptions, program_subscriptions, signature_subscriptions, gossip_account_subscriptions, + gossip_logs_subscriptions, gossip_program_subscriptions, gossip_signature_subscriptions, slot_subscriptions, @@ -474,6 +530,25 @@ impl RpcSubscriptions { ) } + fn check_logs( + address_with_enable_votes_flag: &(Option, bool), + bank_forks: &Arc>, + logs_subscriptions: Arc, + notifier: &RpcNotifier, + commitment_slots: &CommitmentSlots, + ) -> HashSet { + let subscriptions = logs_subscriptions.read().unwrap(); + check_commitment_and_notify( + &subscriptions, + address_with_enable_votes_flag, + bank_forks, + commitment_slots, + Bank::get_transaction_logs_adapter, + filter_logs_results, + notifier, + ) + } + fn check_program( program_id: &Pubkey, bank_forks: &Arc>, @@ -647,6 +722,114 @@ impl RpcSubscriptions { } } + pub fn add_logs_subscription( + &self, + address: Option, + include_votes: bool, + commitment: Option, + sub_id: SubscriptionId, + subscriber: Subscriber>, + ) { + let commitment = commitment.unwrap_or_else(CommitmentConfig::single_gossip); + + { + let mut subscriptions = if commitment.commitment == CommitmentLevel::SingleGossip { + self.subscriptions + .gossip_logs_subscriptions + .write() + .unwrap() + } else { + self.subscriptions.logs_subscriptions.write().unwrap() + }; + add_subscription( + &mut subscriptions, + (address, include_votes), + commitment, + sub_id, + subscriber, + 0, // last_notified_slot is not utilized for logs subscriptions + None, + ); + } + self.update_bank_transaction_log_keys(); + } + + pub fn remove_logs_subscription(&self, id: &SubscriptionId) -> bool { + let mut removed = { + let mut subscriptions = self.subscriptions.logs_subscriptions.write().unwrap(); + remove_subscription(&mut subscriptions, id) + }; + + if !removed { + removed = { + let mut subscriptions = self + .subscriptions + .gossip_logs_subscriptions + .write() + .unwrap(); + remove_subscription(&mut subscriptions, id) + }; + } + + if removed { + self.update_bank_transaction_log_keys(); + } + removed + } + + fn update_bank_transaction_log_keys(&self) { + // Grab a write lock for both `logs_subscriptions` and `gossip_logs_subscriptions`, to + // ensure `Bank::transaction_log_collector_config` is updated atomically. + let logs_subscriptions = self.subscriptions.logs_subscriptions.write().unwrap(); + let gossip_logs_subscriptions = self + .subscriptions + .gossip_logs_subscriptions + .write() + .unwrap(); + + let mut config = TransactionLogCollectorConfig::default(); + + let mut all = false; + let mut all_with_votes = false; + let mut mentioned_address = false; + for (address, with_votes) in logs_subscriptions + .keys() + .chain(gossip_logs_subscriptions.keys()) + { + match address { + None => { + if *with_votes { + all_with_votes = true; + } else { + all = true; + } + } + Some(address) => { + config.mentioned_addresses.insert(*address); + mentioned_address = true; + } + } + } + config.filter = if all_with_votes { + TransactionLogCollectorFilter::AllWithVotes + } else if all { + TransactionLogCollectorFilter::All + } else if mentioned_address { + TransactionLogCollectorFilter::OnlyMentionedAddresses + } else { + TransactionLogCollectorFilter::None + }; + + *self + .bank_forks + .read() + .unwrap() + .root_bank() + .transaction_log_collector_config + .write() + .unwrap() = config; + } + pub fn add_signature_subscription( &self, signature: Signature, @@ -847,8 +1030,9 @@ impl RpcSubscriptions { } } NotificationEntry::Bank(commitment_slots) => { - RpcSubscriptions::notify_accounts_programs_signatures( + RpcSubscriptions::notify_accounts_logs_programs_signatures( &subscriptions.account_subscriptions, + &subscriptions.logs_subscriptions, &subscriptions.program_subscriptions, &subscriptions.signature_subscriptions, &bank_forks, @@ -894,8 +1078,9 @@ impl RpcSubscriptions { highest_confirmed_slot: slot, ..CommitmentSlots::default() }; - RpcSubscriptions::notify_accounts_programs_signatures( + RpcSubscriptions::notify_accounts_logs_programs_signatures( &subscriptions.gossip_account_subscriptions, + &subscriptions.gossip_logs_subscriptions, &subscriptions.gossip_program_subscriptions, &subscriptions.gossip_signature_subscriptions, bank_forks, @@ -905,8 +1090,9 @@ impl RpcSubscriptions { ); } - fn notify_accounts_programs_signatures( + fn notify_accounts_logs_programs_signatures( account_subscriptions: &Arc, + logs_subscriptions: &Arc, program_subscriptions: &Arc, signature_subscriptions: &Arc, bank_forks: &Arc>, @@ -932,6 +1118,24 @@ impl RpcSubscriptions { } accounts_time.stop(); + let mut logs_time = Measure::start("logs"); + let logs: Vec<_> = { + let subs = logs_subscriptions.read().unwrap(); + subs.keys().cloned().collect() + }; + let mut num_logs_notified = 0; + for address in &logs { + num_logs_notified += Self::check_logs( + address, + bank_forks, + logs_subscriptions.clone(), + ¬ifier, + &commitment_slots, + ) + .len(); + } + logs_time.stop(); + let mut programs_time = Measure::start("programs"); let programs: Vec<_> = { let subs = program_subscriptions.read().unwrap(); @@ -990,6 +1194,9 @@ impl RpcSubscriptions { ("num_account_subscriptions", pubkeys.len(), i64), ("num_account_pubkeys_notified", num_pubkeys_notified, i64), ("accounts_time", accounts_time.as_us() as i64, i64), + ("num_logs_subscriptions", logs.len(), i64), + ("num_logs_notified", num_logs_notified, i64), + ("logs_time", logs_time.as_us() as i64, i64), ("num_program_subscriptions", programs.len(), i64), ("num_programs_notified", num_programs_notified, i64), ("programs_time", programs_time.as_us() as i64, i64), diff --git a/docs/src/developing/clients/jsonrpc-api.md b/docs/src/developing/clients/jsonrpc-api.md index a2f7cb7a44..89141e9fbc 100644 --- a/docs/src/developing/clients/jsonrpc-api.md +++ b/docs/src/developing/clients/jsonrpc-api.md @@ -63,6 +63,8 @@ gives a convenient interface for the RPC methods. - [Subscription Websocket](jsonrpc-api.md#subscription-websocket) - [accountSubscribe](jsonrpc-api.md#accountsubscribe) - [accountUnsubscribe](jsonrpc-api.md#accountunsubscribe) + - [logsSubscribe](jsonrpc-api.md#logssubscribe) + - [logsUnsubscribe](jsonrpc-api.md#logsunsubscribe) - [programSubscribe](jsonrpc-api.md#programsubscribe) - [programUnsubscribe](jsonrpc-api.md#programunsubscribe) - [signatureSubscribe](jsonrpc-api.md#signaturesubscribe) @@ -2876,7 +2878,7 @@ Request: "CM78CPUeXjn8o3yroDHxUtKsZZgoy4GPkPPXfouKNH12", { "encoding": "base64", - "commitment": "single" + "commitment": "root" } ] } @@ -2983,6 +2985,103 @@ Result: {"jsonrpc": "2.0","result": true,"id": 1} ``` +### logsSubscribe + +Subscribe to transaction logging. **UNSTABLE** + +#### Parameters: + +- `filter: |` - filter criteria for the logs to receive results by account type; currently supported: + - "all" - subscribe to all transactions except for simple vote transactions + - "allWithVotes" - subscribe to all transactions including simple vote transactions + - `{ "mentions": [ ] }` - subscribe to all transactions that mention the provided Pubkey (as base-58 encoded string) +- `` - (optional) Configuration object containing the following optional fields: + - (optional) [Commitment](jsonrpc-api.md#configuring-state-commitment) + +#### Results: + +- `` - Subscription id \(needed to unsubscribe\) + +#### Example: + +Request: +```json +{ + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + { + "mentions": [ "11111111111111111111111111111111" ] + } + { + "commitment": "max" + } + ] +} +{ + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ "all" ] +} +``` + +Result: +```json +{"jsonrpc": "2.0","result": 24040,"id": 1} +``` + +#### Notification Format: + +Base58 encoding: +```json +{ + "jsonrpc": "2.0", + "method": "logsNotification", + "params": { + "result": { + "context": { + "slot": 5208469 + }, + "value": { + "signature": "5h6xBEauJ3PK6SWCZ1PGjBvj8vDdWG3KpwATGy1ARAXFSDwt8GFXM7W5Ncn16wmqokgpiKRLuS83KUxyZyv2sUYv", + "err": null, + "logs": [ + "BPF program 83astBRguLMdt2h5U1Tpdq5tjFoJ6noeGwaY3mDLVcri success" + ] + } + }, + "subscription": 24040 + } +} +``` + +### logsUnsubscribe + +Unsubscribe from transaction logging + +#### Parameters: + +- `` - id of subscription to cancel + +#### Results: + +- `` - unsubscribe success message + +#### Example: + +Request: +```json +{"jsonrpc":"2.0", "id":1, "method":"logsUnsubscribe", "params":[0]} + +``` + +Result: +```json +{"jsonrpc": "2.0","result": true,"id": 1} +``` + ### programSubscribe Subscribe to a program to receive notifications when the lamports or data for a given account owned by the program changes @@ -3012,7 +3111,7 @@ Request: "11111111111111111111111111111111", { "encoding": "base64", - "commitment": "single" + "commitment": "max" } ] } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 96cbeb221a..fd7c979b4e 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -404,6 +404,45 @@ pub type InnerInstructionsList = Vec; /// A list of log messages emitted during a transaction pub type TransactionLogMessages = Vec; +#[derive(Serialize, Deserialize, AbiExample, AbiEnumVisitor, Debug, PartialEq)] +pub enum TransactionLogCollectorFilter { + All, + AllWithVotes, + None, + OnlyMentionedAddresses, +} + +impl Default for TransactionLogCollectorFilter { + fn default() -> Self { + Self::None + } +} + +#[derive(AbiExample, Debug, Default)] +pub struct TransactionLogCollectorConfig { + pub mentioned_addresses: HashSet, + pub filter: TransactionLogCollectorFilter, +} + +#[derive(AbiExample, Clone, Debug)] +pub struct TransactionLogInfo { + pub signature: Signature, + pub result: Result<()>, + pub is_vote: bool, + pub log_messages: TransactionLogMessages, +} + +#[derive(AbiExample, Default, Debug)] +pub struct TransactionLogCollector { + // All the logs collected for from this Bank. Exact contents depend on the + // active `TransactionLogCollectorFilter` + pub logs: Vec, + + // For each `mentioned_addresses`, maintain a list of indicies into `logs` to easily + // locate the logs from transactions that included the mentioned addresses. + pub mentioned_address_map: HashMap>, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum HashAgeKind { Extant, @@ -724,6 +763,13 @@ pub struct Bank { transaction_debug_keys: Option>>, + // Global configuration for how transaction logs should be collected across all banks + pub transaction_log_collector_config: Arc>, + + // Logs from transactions that this Bank executed collected according to the criteria in + // `transaction_log_collector_config` + pub transaction_log_collector: Arc>, + pub feature_set: Arc, } @@ -868,6 +914,8 @@ impl Bank { rewards_pool_pubkeys: parent.rewards_pool_pubkeys.clone(), cached_executors: RwLock::new((*parent.cached_executors.read().unwrap()).clone()), transaction_debug_keys: parent.transaction_debug_keys.clone(), + transaction_log_collector_config: parent.transaction_log_collector_config.clone(), + transaction_log_collector: Arc::new(RwLock::new(TransactionLogCollector::default())), feature_set: parent.feature_set.clone(), }; @@ -984,6 +1032,8 @@ impl Bank { CachedExecutors::new(MAX_CACHED_EXECUTORS), )))), transaction_debug_keys: debug_keys, + transaction_log_collector_config: new(), + transaction_log_collector: new(), feature_set: new(), }; bank.finish_init(genesis_config, additional_builtins); @@ -2117,7 +2167,10 @@ impl Bank { } /// Run transactions against a frozen bank without committing the results - pub fn simulate_transaction(&self, transaction: Transaction) -> (Result<()>, Vec) { + pub fn simulate_transaction( + &self, + transaction: Transaction, + ) -> (Result<()>, TransactionLogMessages) { assert!(self.is_frozen(), "simulation bank must be frozen"); let txs = &[transaction]; @@ -2127,7 +2180,7 @@ impl Bank { _loaded_accounts, executed, _inner_instructions, - transaction_logs, + log_messages, _retryable_transactions, _transaction_count, _signature_count, @@ -2142,7 +2195,7 @@ impl Bank { ); let transaction_result = executed[0].0.clone().map(|_| ()); - let log_messages = transaction_logs + let log_messages = log_messages .get(0) .map_or(vec![], |messages| messages.to_vec()); @@ -2182,6 +2235,7 @@ impl Bank { &self.feature_set, ) } + fn check_age( &self, txs: &[Transaction], @@ -2213,6 +2267,7 @@ impl Bank { }) .collect() } + fn check_signatures( &self, txs: &[Transaction], @@ -2249,6 +2304,7 @@ impl Bank { }) .collect() } + fn filter_by_vote_transactions( &self, txs: &[Transaction], @@ -2260,23 +2316,8 @@ impl Bank { .zip(lock_results.into_iter()) .map(|((_, tx), lock_res)| { if lock_res.0.is_ok() { - if tx.message.instructions.len() == 1 { - let instruction = &tx.message.instructions[0]; - let program_pubkey = - tx.message.account_keys[instruction.program_id_index as usize]; - if program_pubkey == solana_vote_program::id() { - if let Ok(vote_instruction) = - limited_deserialize::(&instruction.data) - { - match vote_instruction { - VoteInstruction::Vote(_) - | VoteInstruction::VoteSwitch(_, _) => { - return lock_res; - } - _ => {} - } - } - } + if is_simple_vote_transaction(tx) { + return lock_res; } error_counters.not_allowed_during_cluster_maintenance += 1; @@ -2598,7 +2639,7 @@ impl Bank { let mut signature_count: u64 = 0; let mut inner_instructions: Vec> = Vec::with_capacity(txs.len()); - let mut transaction_logs: Vec = Vec::with_capacity(txs.len()); + let mut transaction_log_messages = Vec::with_capacity(txs.len()); let bpf_compute_budget = self .bpf_compute_budget .unwrap_or_else(|| BpfComputeBudget::new(&self.feature_set)); @@ -2649,7 +2690,7 @@ impl Bank { .unwrap_or_default() .into(); - transaction_logs.push(log_messages); + transaction_log_messages.push(log_messages); } Self::compile_recorded_instructions( @@ -2686,7 +2727,10 @@ impl Bank { let mut tx_count: u64 = 0; let err_count = &mut error_counters.total; - for ((r, _hash_age_kind), tx) in executed.iter().zip(txs.iter()) { + let transaction_log_collector_config = + self.transaction_log_collector_config.read().unwrap(); + + for (i, ((r, _hash_age_kind), tx)) in executed.iter().zip(txs.iter()).enumerate() { if let Some(debug_keys) = &self.transaction_debug_keys { for key in &tx.message.account_keys { if debug_keys.contains(key) { @@ -2695,6 +2739,50 @@ impl Bank { } } } + + if transaction_log_collector_config.filter != TransactionLogCollectorFilter::None { + let mut transaction_log_collector = self.transaction_log_collector.write().unwrap(); + let transaction_log_index = transaction_log_collector.logs.len(); + + let mut mentioned_address = false; + if !transaction_log_collector_config + .mentioned_addresses + .is_empty() + { + for key in &tx.message.account_keys { + if transaction_log_collector_config + .mentioned_addresses + .contains(key) + { + transaction_log_collector + .mentioned_address_map + .entry(*key) + .or_default() + .push(transaction_log_index); + mentioned_address = true; + } + } + } + + let is_vote = is_simple_vote_transaction(tx); + + let store = match transaction_log_collector_config.filter { + TransactionLogCollectorFilter::All => !is_vote || mentioned_address, + TransactionLogCollectorFilter::AllWithVotes => true, + TransactionLogCollectorFilter::None => false, + TransactionLogCollectorFilter::OnlyMentionedAddresses => mentioned_address, + }; + + if store { + transaction_log_collector.logs.push(TransactionLogInfo { + signature: tx.signatures[0], + result: r.clone(), + is_vote, + log_messages: transaction_log_messages.get(i).cloned().unwrap_or_default(), + }); + } + } + if r.is_ok() { tx_count += 1; } else { @@ -2716,7 +2804,7 @@ impl Bank { loaded_accounts, executed, inner_instructions, - transaction_logs, + transaction_log_messages, retryable_txs, tx_count, signature_count, @@ -3673,6 +3761,26 @@ impl Bank { .load_by_program_slot(self.slot(), Some(program_id)) } + pub fn get_transaction_logs( + &self, + address: Option<&Pubkey>, + ) -> Option> { + let transaction_log_collector = self.transaction_log_collector.read().unwrap(); + + match address { + None => Some(transaction_log_collector.logs.clone()), + Some(address) => transaction_log_collector + .mentioned_address_map + .get(address) + .map(|log_indices| { + log_indices + .iter() + .map(|i| transaction_log_collector.logs[*i].clone()) + .collect() + }), + } + } + pub fn get_all_accounts_modified_since_parent(&self) -> Vec<(Pubkey, Account)> { self.rc.accounts.load_by_program_slot(self.slot(), None) } @@ -4377,6 +4485,21 @@ pub fn goto_end_of_slot(bank: &mut Bank) { } } +fn is_simple_vote_transaction(transaction: &Transaction) -> bool { + if transaction.message.instructions.len() == 1 { + let instruction = &transaction.message.instructions[0]; + let program_pubkey = + transaction.message.account_keys[instruction.program_id_index as usize]; + if program_pubkey == solana_vote_program::id() { + if let Ok(vote_instruction) = limited_deserialize::(&instruction.data) + { + return matches!(vote_instruction, VoteInstruction::Vote(_) | VoteInstruction::VoteSwitch(_, _)); + } + } + } + false +} + #[cfg(test)] pub(crate) mod tests { use super::*;