diff --git a/Cargo.lock b/Cargo.lock index a98589ef42..7d5affb5a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5892,6 +5892,7 @@ dependencies = [ name = "solana-pubsub-client" version = "1.15.0" dependencies = [ + "anyhow", "crossbeam-channel", "futures-util", "log", diff --git a/docs/src/developing/clients/jsonrpc-api.md b/docs/src/developing/clients/jsonrpc-api.md index 51aa290357..d60f1d7577 100644 --- a/docs/src/developing/clients/jsonrpc-api.md +++ b/docs/src/developing/clients/jsonrpc-api.md @@ -4184,7 +4184,7 @@ Result: ### programSubscribe -Subscribe to a program to receive notifications when the lamports or data for a given account owned by the program changes +Subscribe to a program to receive notifications when the lamports or data for an account owned by the given program changes #### Parameters: @@ -4352,7 +4352,7 @@ Result: ### signatureSubscribe -Subscribe to a transaction signature to receive notification when the transaction is confirmed On `signatureNotification`, the subscription is automatically cancelled +Subscribe to a transaction signature to receive notification when a given transaction is committed. On `signatureNotification`, the subscription is automatically cancelled. #### Parameters: diff --git a/pubsub-client/Cargo.toml b/pubsub-client/Cargo.toml index a984cf1232..3fc9c7adf6 100644 --- a/pubsub-client/Cargo.toml +++ b/pubsub-client/Cargo.toml @@ -29,6 +29,7 @@ tungstenite = { version = "0.17.2", features = ["rustls-tls-webpki-roots"] } url = "2.2.2" [dev-dependencies] +anyhow = "1.0.58" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/pubsub-client/src/nonblocking/pubsub_client.rs b/pubsub-client/src/nonblocking/pubsub_client.rs index 79eb1cea90..1e091db9dc 100644 --- a/pubsub-client/src/nonblocking/pubsub_client.rs +++ b/pubsub-client/src/nonblocking/pubsub_client.rs @@ -1,3 +1,171 @@ +//! A client for subscribing to messages from the RPC server. +//! +//! The [`PubsubClient`] implements [Solana WebSocket event +//! subscriptions][spec]. +//! +//! [spec]: https://docs.solana.com/developing/clients/jsonrpc-api#subscription-websocket +//! +//! This is a nonblocking (async) API. For a blocking API use the synchronous +//! client in [`crate::pubsub_client`]. +//! +//! A single `PubsubClient` client may be used to subscribe to many events via +//! subscription methods like [`PubsubClient::account_subscribe`]. These methods +//! return a [`PubsubClientResult`] of a pair, the first element being a +//! [`BoxStream`] of subscription-specific [`RpcResponse`]s, the second being an +//! unsubscribe closure, an asynchronous function that can be called and +//! `await`ed to unsubscribe. +//! +//! Note that `BoxStream` contains an immutable reference to the `PubsubClient` +//! that created it. This makes `BoxStream` not `Send`, forcing it to stay in +//! the same task as its `PubsubClient`. `PubsubClient` though is `Send` and +//! `Sync`, and can be shared between tasks by putting it in an `Arc`. Thus +//! one viable pattern to creating multiple subscriptions is: +//! +//! - create an `Arc` +//! - spawn one task for each subscription, sharing the `PubsubClient`. +//! - in each task: +//! - create a subscription +//! - send the `UnsubscribeFn` to another task to handle shutdown +//! - loop while receiving messages from the subscription +//! +//! This pattern is illustrated in the example below. +//! +//! By default the [`block_subscribe`] and [`vote_subscribe`] events are +//! disabled on RPC nodes. They can be enabled by passing +//! `--rpc-pubsub-enable-block-subscription` and +//! `--rpc-pubsub-enable-vote-subscription` to `solana-validator`. When these +//! methods are disabled, the RPC server will return a "Method not found" error +//! message. +//! +//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe +//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe +//! +//! # Examples +//! +//! Demo two async `PubsubClient` subscriptions with clean shutdown. +//! +//! This spawns a task for each subscription type, each of which subscribes and +//! sends back a ready message and an unsubscribe channel (closure), then loops +//! on printing messages. The main task then waits for user input before +//! unsubscribing and waiting on the tasks. +//! +//! ``` +//! use anyhow::Result; +//! use futures_util::StreamExt; +//! use solana_pubsub_client::nonblocking::pubsub_client::PubsubClient; +//! use std::sync::Arc; +//! use tokio::io::AsyncReadExt; +//! use tokio::sync::mpsc::unbounded_channel; +//! +//! pub async fn watch_subscriptions( +//! websocket_url: &str, +//! ) -> Result<()> { +//! +//! // Subscription tasks will send a ready signal when they have subscribed. +//! let (ready_sender, mut ready_receiver) = unbounded_channel::<()>(); +//! +//! // Channel to receive unsubscribe channels (actually closures). +//! // These receive a pair of `(Box BoxFuture<'static, ()> + Send>), &'static str)`, +//! // where the first is a closure to call to unsubscribe, the second is the subscription name. +//! let (unsubscribe_sender, mut unsubscribe_receiver) = unbounded_channel::<(_, &'static str)>(); +//! +//! // The `PubsubClient` must be `Arc`ed to share it across tasks. +//! let pubsub_client = Arc::new(PubsubClient::new(websocket_url).await?); +//! +//! let mut join_handles = vec![]; +//! +//! join_handles.push(("slot", tokio::spawn({ +//! // Clone things we need before moving their clones into the `async move` block. +//! // +//! // The subscriptions have to be made from the tasks that will receive the subscription messages, +//! // because the subscription streams hold a reference to the `PubsubClient`. +//! // Otherwise we would just subscribe on the main task and send the receivers out to other tasks. +//! +//! let ready_sender = ready_sender.clone(); +//! let unsubscribe_sender = unsubscribe_sender.clone(); +//! let pubsub_client = Arc::clone(&pubsub_client); +//! async move { +//! let (mut slot_notifications, slot_unsubscribe) = +//! pubsub_client.slot_subscribe().await?; +//! +//! // With the subscription started, +//! // send a signal back to the main task for synchronization. +//! ready_sender.send(()).expect("channel"); +//! +//! // Send the unsubscribe closure back to the main task. +//! unsubscribe_sender.send((slot_unsubscribe, "slot")) +//! .map_err(|e| format!("{}", e)).expect("channel"); +//! +//! // Drop senders so that the channels can close. +//! // The main task will receive until channels are closed. +//! drop((ready_sender, unsubscribe_sender)); +//! +//! // Do something with the subscribed messages. +//! // This loop will end once the main task unsubscribes. +//! while let Some(slot_info) = slot_notifications.next().await { +//! println!("------------------------------------------------------------"); +//! println!("slot pubsub result: {:?}", slot_info); +//! } +//! +//! // This type hint is necessary to allow the `async move` block to use `?`. +//! Ok::<_, anyhow::Error>(()) +//! } +//! }))); +//! +//! join_handles.push(("root", tokio::spawn({ +//! let ready_sender = ready_sender.clone(); +//! let unsubscribe_sender = unsubscribe_sender.clone(); +//! let pubsub_client = Arc::clone(&pubsub_client); +//! async move { +//! let (mut root_notifications, root_unsubscribe) = +//! pubsub_client.root_subscribe().await?; +//! +//! ready_sender.send(()).expect("channel"); +//! unsubscribe_sender.send((root_unsubscribe, "root")) +//! .map_err(|e| format!("{}", e)).expect("channel"); +//! drop((ready_sender, unsubscribe_sender)); +//! +//! while let Some(root) = root_notifications.next().await { +//! println!("------------------------------------------------------------"); +//! println!("root pubsub result: {:?}", root); +//! } +//! +//! Ok::<_, anyhow::Error>(()) +//! } +//! }))); +//! +//! // Drop these senders so that the channels can close +//! // and their receivers return `None` below. +//! drop(ready_sender); +//! drop(unsubscribe_sender); +//! +//! // Wait until all subscribers are ready before proceeding with application logic. +//! while let Some(_) = ready_receiver.recv().await { } +//! +//! // Do application logic here. +//! +//! // Wait for input or some application-specific shutdown condition. +//! tokio::io::stdin().read_u8().await?; +//! +//! // Unsubscribe from everything, which will shutdown all the tasks. +//! while let Some((unsubscribe, name)) = unsubscribe_receiver.recv().await { +//! println!("unsubscribing from {}", name); +//! unsubscribe().await +//! } +//! +//! // Wait for the tasks. +//! for (name, handle) in join_handles { +//! println!("waiting on task {}", name); +//! if let Ok(Err(e)) = handle.await { +//! println!("task {} failed: {}", name, e); +//! } +//! } +//! +//! Ok(()) +//! } +//! # Ok::<(), anyhow::Error>(()) +//! ``` + use { futures_util::{ future::{ready, BoxFuture, FutureExt}, @@ -85,6 +253,9 @@ type RequestMsg = ( oneshot::Sender>, ); +/// A client for subscribing to messages from the RPC server. +/// +/// See the [module documentation][self]. #[derive(Debug)] pub struct PubsubClient { subscribe_tx: mpsc::UnboundedSender, @@ -175,6 +346,15 @@ impl PubsubClient { )) } + /// Subscribe to account events. + /// + /// Receives messages of type [`UiAccount`] when an account's lamports or data changes. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`accountSubscribe`] RPC method. + /// + /// [`accountSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#accountsubscribe pub async fn account_subscribe( &self, pubkey: &Pubkey, @@ -184,6 +364,18 @@ impl PubsubClient { self.subscribe("account", params).await } + /// Subscribe to block events. + /// + /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized. + /// + /// This method is disabled by default. It can be enabled by passing + /// `--rpc-pubsub-enable-block-subscription` to `solana-validator`. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`blockSubscribe`] RPC method. + /// + /// [`blockSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#blocksubscribe---unstable-disabled-by-default pub async fn block_subscribe( &self, filter: RpcBlockSubscribeFilter, @@ -192,6 +384,15 @@ impl PubsubClient { self.subscribe("block", json!([filter, config])).await } + /// Subscribe to transaction log events. + /// + /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`logsSubscribe`] RPC method. + /// + /// [`logsSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#logssubscribe pub async fn logs_subscribe( &self, filter: RpcTransactionLogsFilter, @@ -200,6 +401,16 @@ impl PubsubClient { self.subscribe("logs", json!([filter, config])).await } + /// Subscribe to program account events. + /// + /// Receives messages of type [`RpcKeyedAccount`] when an account owned + /// by the given program changes. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`programSubscribe`] RPC method. + /// + /// [`programSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#programsubscribe pub async fn program_subscribe( &self, pubkey: &Pubkey, @@ -223,14 +434,52 @@ impl PubsubClient { self.subscribe("program", params).await } + /// Subscribe to vote events. + /// + /// Receives messages of type [`RpcVote`] when a new vote is observed. These + /// votes are observed prior to confirmation and may never be confirmed. + /// + /// This method is disabled by default. It can be enabled by passing + /// `--rpc-pubsub-enable-vote-subscription` to `solana-validator`. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`voteSubscribe`] RPC method. + /// + /// [`voteSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#votesubscribe---unstable-disabled-by-default pub async fn vote_subscribe(&self) -> SubscribeResult<'_, RpcVote> { self.subscribe("vote", json!([])).await } + /// Subscribe to root events. + /// + /// Receives messages of type [`Slot`] when a new [root] is set by the + /// validator. + /// + /// [root]: https://docs.solana.com/terminology#root + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`rootSubscribe`] RPC method. + /// + /// [`rootSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#rootsubscribe pub async fn root_subscribe(&self) -> SubscribeResult<'_, Slot> { self.subscribe("root", json!([])).await } + /// Subscribe to transaction confirmation events. + /// + /// Receives messages of type [`RpcSignatureResult`] when a transaction + /// with the given signature is committed. + /// + /// This is a subscription to a single notification. It is automatically + /// cancelled by the server once the notification is sent. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`signatureSubscribe`] RPC method. + /// + /// [`signatureSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#signaturesubscribe pub async fn signature_subscribe( &self, signature: &Signature, @@ -240,10 +489,33 @@ impl PubsubClient { self.subscribe("signature", params).await } + /// Subscribe to slot events. + /// + /// Receives messages of type [`SlotInfo`] when a slot is processed. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`slotSubscribe`] RPC method. + /// + /// [`slotSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#slotsubscribe pub async fn slot_subscribe(&self) -> SubscribeResult<'_, SlotInfo> { self.subscribe("slot", json!([])).await } + /// Subscribe to slot update events. + /// + /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur. + /// + /// Note that this method operates differently than other subscriptions: + /// instead of sending the message to a reciever on a channel, it accepts a + /// `handler` callback that processes the message directly. This processing + /// occurs on another thread. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method. + /// + /// [`slotUpdatesSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#slotsupdatessubscribe---unstable pub async fn slot_updates_subscribe(&self) -> SubscribeResult<'_, SlotUpdate> { self.subscribe("slotsUpdates", json!([])).await } diff --git a/pubsub-client/src/pubsub_client.rs b/pubsub-client/src/pubsub_client.rs index 92e5f0d4af..1f25972b0f 100644 --- a/pubsub-client/src/pubsub_client.rs +++ b/pubsub-client/src/pubsub_client.rs @@ -1,3 +1,91 @@ +//! A client for subscribing to messages from the RPC server. +//! +//! The [`PubsubClient`] implements [Solana WebSocket event +//! subscriptions][spec]. +//! +//! [spec]: https://docs.solana.com/developing/clients/jsonrpc-api#subscription-websocket +//! +//! This is a blocking API. For a non-blocking API use the asynchronous client +//! in [`crate::nonblocking::pubsub_client`]. +//! +//! `PubsubClient` contains static methods to subscribe to events, like +//! [`PubsubClient::account_subscribe`]. These methods each return their own +//! subscription type, like [`AccountSubscription`], that are typedefs of +//! tuples, the first element being a handle to the subscription, like +//! [`AccountSubscription`], the second a [`Receiver`] of [`RpcResponse`] of +//! whichever type is appropriate for the subscription. The subscription handle +//! is a typedef of [`PubsubClientSubscription`], and it must remain live for +//! the receiver to continue receiving messages. +//! +//! Because this is a blocking API, with blocking receivers, a reasonable +//! pattern for using this API is to move each event receiver to its own thread +//! to block on messages, while holding all subscription handles on a single +//! primary thread. +//! +//! While `PubsubClientSubscription` contains methods for shutting down, +//! [`PubsubClientSubscription::send_unsubscribe`], and +//! [`PubsubClientSubscription::shutdown`], because its internal receivers block +//! on events from the server, these subscriptions cannot actually be shutdown +//! reliably. For a non-blocking, cancelable API, use the asynchronous client +//! in [`crate::nonblocking::pubsub_client`]. +//! +//! By default the [`block_subscribe`] and [`vote_subscribe`] events are +//! disabled on RPC nodes. They can be enabled by passing +//! `--rpc-pubsub-enable-block-subscription` and +//! `--rpc-pubsub-enable-vote-subscription` to `solana-validator`. When these +//! methods are disabled, the RPC server will return a "Method not found" error +//! message. +//! +//! [`block_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.block_subscribe +//! [`vote_subscribe`]: https://docs.rs/solana-rpc/latest/solana_rpc/rpc_pubsub/trait.RpcSolPubSub.html#tymethod.vote_subscribe +//! +//! # Examples +//! +//! This example subscribes to account events and then loops forever receiving +//! them. +//! +//! ``` +//! use anyhow::Result; +//! use solana_sdk::commitment_config::CommitmentConfig; +//! use solana_pubsub_client::pubsub_client::PubsubClient; +//! use solana_rpc_client_api::config::RpcAccountInfoConfig; +//! use solana_sdk::pubkey::Pubkey; +//! use std::thread; +//! +//! fn get_account_updates(account_pubkey: Pubkey) -> Result<()> { +//! let url = "wss://api.devnet.solana.com/"; +//! +//! let (mut account_subscription_client, account_subscription_receiver) = +//! PubsubClient::account_subscribe( +//! url, +//! &account_pubkey, +//! Some(RpcAccountInfoConfig { +//! encoding: None, +//! data_slice: None, +//! commitment: Some(CommitmentConfig::confirmed()), +//! min_context_slot: None, +//! }), +//! )?; +//! +//! loop { +//! match account_subscription_receiver.recv() { +//! Ok(response) => { +//! println!("account subscription response: {:?}", response); +//! } +//! Err(e) => { +//! println!("account subscription error: {:?}", e); +//! break; +//! } +//! } +//! } +//! +//! Ok(()) +//! } +//! # +//! # get_account_updates(solana_sdk::pubkey::new_rand()); +//! # Ok::<(), anyhow::Error>(()) +//! ``` + pub use crate::nonblocking::pubsub_client::PubsubClientError; use { crossbeam_channel::{unbounded, Receiver, Sender}, @@ -36,6 +124,11 @@ use { url::Url, }; +/// A subscription. +/// +/// The subscription is unsubscribed on drop, and note that unsubscription (and +/// thus drop) time is unbounded. See +/// [`PubsubClientSubscription::send_unsubscribe`]. pub struct PubsubClientSubscription where T: DeserializeOwned, @@ -95,6 +188,14 @@ where ))) } + /// Send an unsubscribe message to the server. + /// + /// Note that this will block as long as the internal subscription receiver + /// is waiting on messages from the server, and this can take an unbounded + /// amount of time if the server does not send any messages. + /// + /// If a pubsub client needs to shutdown reliably it should use + /// the async client in [`crate::nonblocking::pubsub_client`]. pub fn send_unsubscribe(&self) -> Result<(), PubsubClientError> { let method = format!("{}Unsubscribe", self.operation); self.socket @@ -167,6 +268,14 @@ where ))) } + /// Shutdown the internel message receiver and wait for its thread to exit. + /// + /// Note that this will block as long as the subscription receiver is + /// waiting on messages from the server, and this can take an unbounded + /// amount of time if the server does not send any messages. + /// + /// If a pubsub client needs to shutdown reliably it should use + /// the async client in [`crate::nonblocking::pubsub_client`]. pub fn shutdown(&mut self) -> std::thread::Result<()> { if self.t_cleanup.is_some() { info!("websocket thread - shutting down"); @@ -221,6 +330,9 @@ pub type VoteSubscription = (PubsubVoteClientSubscription, Receiver); pub type PubsubRootClientSubscription = PubsubClientSubscription; pub type RootSubscription = (PubsubRootClientSubscription, Receiver); +/// A client for subscribing to messages from the RPC server. +/// +/// See the [module documentation][self]. pub struct PubsubClient {} fn connect_with_retry( @@ -258,6 +370,15 @@ fn connect_with_retry( } impl PubsubClient { + /// Subscribe to account events. + /// + /// Receives messages of type [`UiAccount`] when an account's lamports or data changes. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`accountSubscribe`] RPC method. + /// + /// [`accountSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#accountsubscribe pub fn account_subscribe( url: &str, pubkey: &Pubkey, @@ -299,6 +420,18 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to block events. + /// + /// Receives messages of type [`RpcBlockUpdate`] when a block is confirmed or finalized. + /// + /// This method is disabled by default. It can be enabled by passing + /// `--rpc-pubsub-enable-block-subscription` to `solana-validator`. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`blockSubscribe`] RPC method. + /// + /// [`blockSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#blocksubscribe---unstable-disabled-by-default pub fn block_subscribe( url: &str, filter: RpcBlockSubscribeFilter, @@ -338,6 +471,15 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to transaction log events. + /// + /// Receives messages of type [`RpcLogsResponse`] when a transaction is committed. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`logsSubscribe`] RPC method. + /// + /// [`logsSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#logssubscribe pub fn logs_subscribe( url: &str, filter: RpcTransactionLogsFilter, @@ -377,6 +519,16 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to program account events. + /// + /// Receives messages of type [`RpcKeyedAccount`] when an account owned + /// by the given program changes. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`programSubscribe`] RPC method. + /// + /// [`programSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#programsubscribe pub fn program_subscribe( url: &str, pubkey: &Pubkey, @@ -429,6 +581,19 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to vote events. + /// + /// Receives messages of type [`RpcVote`] when a new vote is observed. These + /// votes are observed prior to confirmation and may never be confirmed. + /// + /// This method is disabled by default. It can be enabled by passing + /// `--rpc-pubsub-enable-vote-subscription` to `solana-validator`. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`voteSubscribe`] RPC method. + /// + /// [`voteSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#votesubscribe---unstable-disabled-by-default pub fn vote_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; @@ -462,6 +627,18 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to root events. + /// + /// Receives messages of type [`Slot`] when a new [root] is set by the + /// validator. + /// + /// [root]: https://docs.solana.com/terminology#root + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`rootSubscribe`] RPC method. + /// + /// [`rootSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#rootsubscribe pub fn root_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; @@ -495,6 +672,19 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to transaction confirmation events. + /// + /// Receives messages of type [`RpcSignatureResult`] when a transaction + /// with the given signature is committed. + /// + /// This is a subscription to a single notification. It is automatically + /// cancelled by the server once the notification is sent. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`signatureSubscribe`] RPC method. + /// + /// [`signatureSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#signaturesubscribe pub fn signature_subscribe( url: &str, signature: &Signature, @@ -537,6 +727,15 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to slot events. + /// + /// Receives messages of type [`SlotInfo`] when a slot is processed. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`slotSubscribe`] RPC method. + /// + /// [`slotSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#slotsubscribe pub fn slot_subscribe(url: &str) -> Result { let url = Url::parse(url)?; let socket = connect_with_retry(url)?; @@ -571,6 +770,20 @@ impl PubsubClient { Ok((result, receiver)) } + /// Subscribe to slot update events. + /// + /// Receives messages of type [`SlotUpdate`] when various updates to a slot occur. + /// + /// Note that this method operates differently than other subscriptions: + /// instead of sending the message to a reciever on a channel, it accepts a + /// `handler` callback that processes the message directly. This processing + /// occurs on another thread. + /// + /// # RPC Reference + /// + /// This method corresponds directly to the [`slotUpdatesSubscribe`] RPC method. + /// + /// [`slotUpdatesSubscribe`]: https://docs.solana.com/developing/clients/jsonrpc-api#slotsupdatessubscribe---unstable pub fn slot_updates_subscribe( url: &str, handler: impl Fn(SlotUpdate) + Send + 'static, diff --git a/rpc-client/src/rpc_client.rs b/rpc-client/src/rpc_client.rs index 03c4f9a392..d4705954c1 100644 --- a/rpc-client/src/rpc_client.rs +++ b/rpc-client/src/rpc_client.rs @@ -5,6 +5,9 @@ //! [JSON-RPC], using the [`RpcClient`] type. //! //! [JSON-RPC]: https://www.jsonrpc.org/specification +//! +//! This is a blocking API. For a non-blocking API use the asynchronous client +//! in [`crate::nonblocking::rpc_client`]. pub use crate::mock_sender::Mocks; #[allow(deprecated)]