RPC PubSub now uses a well-known socket
This commit is contained in:
parent
32fc0cd7e9
commit
537436bd5e
|
@ -5,12 +5,19 @@ Solana nodes accept HTTP requests using the [JSON-RPC 2.0](https://www.jsonrpc.o
|
||||||
|
|
||||||
To interact with a Solana node inside a JavaScript application, use the [solana-web3.js](https://github.com/solana-labs/solana-web3.js) library, which gives a convenient interface for the RPC methods.
|
To interact with a Solana node inside a JavaScript application, use the [solana-web3.js](https://github.com/solana-labs/solana-web3.js) library, which gives a convenient interface for the RPC methods.
|
||||||
|
|
||||||
RPC Endpoint
|
RPC HTTP Endpoint
|
||||||
---
|
---
|
||||||
|
|
||||||
**Default port:** 8899
|
**Default port:** 8899
|
||||||
eg. http://localhost:8899, http://192.168.1.88:8899
|
eg. http://localhost:8899, http://192.168.1.88:8899
|
||||||
|
|
||||||
|
RPC PubSub WebSocket Endpoint
|
||||||
|
---
|
||||||
|
|
||||||
|
**Default port:** 8900
|
||||||
|
eg. ws://localhost:8900, http://192.168.1.88:8900
|
||||||
|
|
||||||
|
|
||||||
Methods
|
Methods
|
||||||
---
|
---
|
||||||
|
|
||||||
|
@ -234,33 +241,10 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
### startSubscriptionChannel
|
|
||||||
Open a socket on the node for JSON-RPC subscription requests
|
|
||||||
|
|
||||||
##### Parameters:
|
|
||||||
None
|
|
||||||
|
|
||||||
##### Results:
|
|
||||||
* `string` - "port", open websocket port
|
|
||||||
* `string` - "path", unique key to use as websocket path
|
|
||||||
|
|
||||||
##### Example:
|
|
||||||
```bash
|
|
||||||
// Request
|
|
||||||
curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc": "2.0","id":1,"method":"startSubscriptionChannel"}' http://localhost:8899
|
|
||||||
|
|
||||||
// Result
|
|
||||||
{"jsonrpc":"2.0","result":{"port":9876,"path":"BRbmMXn71cKfzXjFsmrTsWsXuQwbjXbwKdoRwVw1FRA3"},"id":1}
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
### Subscription Websocket
|
### Subscription Websocket
|
||||||
After opening a subscription socket with the `subscriptionChannel` JSON-RPC request method, submit subscription requests via websocket protocol
|
After connect to the RPC PubSub websocket at `ws://<ADDRESS>/`:
|
||||||
Connect to the websocket at `ws://<ADDRESS>/<PATH>` returned from the request
|
|
||||||
- Submit subscription requests to the websocket using the methods below
|
- Submit subscription requests to the websocket using the methods below
|
||||||
- Multiple subscriptions may be active at once
|
- Multiple subscriptions may be active at once
|
||||||
- The subscription-channel socket will close when client closes websocket. To create new subscriptions, send a new `subscriptionChannel` JSON-RPC request.
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ use leader_scheduler::LeaderScheduler;
|
||||||
use ledger::read_ledger;
|
use ledger::read_ledger;
|
||||||
use ncp::Ncp;
|
use ncp::Ncp;
|
||||||
use rpc::{JsonRpcService, RPC_PORT};
|
use rpc::{JsonRpcService, RPC_PORT};
|
||||||
|
use rpc_pubsub::PubSubService;
|
||||||
use rpu::Rpu;
|
use rpu::Rpu;
|
||||||
use service::Service;
|
use service::Service;
|
||||||
use signature::{Keypair, KeypairUtil};
|
use signature::{Keypair, KeypairUtil};
|
||||||
|
@ -88,6 +89,7 @@ pub struct Fullnode {
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
rpu: Option<Rpu>,
|
rpu: Option<Rpu>,
|
||||||
rpc_service: JsonRpcService,
|
rpc_service: JsonRpcService,
|
||||||
|
rpc_pubsub_service: PubSubService,
|
||||||
ncp: Ncp,
|
ncp: Ncp,
|
||||||
bank: Arc<Bank>,
|
bank: Arc<Bank>,
|
||||||
cluster_info: Arc<RwLock<ClusterInfo>>,
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
||||||
|
@ -278,6 +280,12 @@ impl Fullnode {
|
||||||
// Drone location/id will need to be handled a different way as soon as leader rotation begins
|
// Drone location/id will need to be handled a different way as soon as leader rotation begins
|
||||||
let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone());
|
let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone());
|
||||||
|
|
||||||
|
let rpc_pubsub_addr = SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::from(0)),
|
||||||
|
rpc_port.unwrap_or(RPC_PORT) + 1,
|
||||||
|
);
|
||||||
|
let rpc_pubsub_service = PubSubService::new(&bank, rpc_pubsub_addr, exit.clone());
|
||||||
|
|
||||||
let ncp = Ncp::new(
|
let ncp = Ncp::new(
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
shared_window.clone(),
|
shared_window.clone(),
|
||||||
|
@ -373,6 +381,7 @@ impl Fullnode {
|
||||||
rpu,
|
rpu,
|
||||||
ncp,
|
ncp,
|
||||||
rpc_service,
|
rpc_service,
|
||||||
|
rpc_pubsub_service,
|
||||||
node_role,
|
node_role,
|
||||||
ledger_path: ledger_path.to_owned(),
|
ledger_path: ledger_path.to_owned(),
|
||||||
exit,
|
exit,
|
||||||
|
@ -567,6 +576,7 @@ impl Service for Fullnode {
|
||||||
}
|
}
|
||||||
self.ncp.join()?;
|
self.ncp.join()?;
|
||||||
self.rpc_service.join()?;
|
self.rpc_service.join()?;
|
||||||
|
self.rpc_pubsub_service.join()?;
|
||||||
|
|
||||||
match self.node_role {
|
match self.node_role {
|
||||||
Some(NodeRole::Validator(validator_service)) => {
|
Some(NodeRole::Validator(validator_service)) => {
|
||||||
|
|
58
src/rpc.rs
58
src/rpc.rs
|
@ -3,15 +3,12 @@
|
||||||
use bank::{Bank, BankError};
|
use bank::{Bank, BankError};
|
||||||
use bincode::deserialize;
|
use bincode::deserialize;
|
||||||
use bs58;
|
use bs58;
|
||||||
use cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE};
|
use cluster_info::ClusterInfo;
|
||||||
use drone::DRONE_PORT;
|
use drone::DRONE_PORT;
|
||||||
use jsonrpc_core::*;
|
use jsonrpc_core::*;
|
||||||
use jsonrpc_http_server::*;
|
use jsonrpc_http_server::*;
|
||||||
use jsonrpc_macros::pubsub::Sink;
|
|
||||||
use netutil::find_available_port_in_range;
|
|
||||||
use rpc_pubsub::{PubSubService, SubscriptionResponse};
|
|
||||||
use service::Service;
|
use service::Service;
|
||||||
use signature::{Keypair, KeypairUtil, Signature};
|
use signature::Signature;
|
||||||
use solana_program_interface::account::Account;
|
use solana_program_interface::account::Account;
|
||||||
use solana_program_interface::pubkey::Pubkey;
|
use solana_program_interface::pubkey::Pubkey;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
@ -63,13 +60,10 @@ impl JsonRpcService {
|
||||||
warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port());
|
warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
loop {
|
while !exit.load(Ordering::Relaxed) {
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
server.unwrap().close();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
sleep(Duration::from_millis(100));
|
sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
server.unwrap().close();
|
||||||
()
|
()
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
@ -132,9 +126,6 @@ build_rpc_trait! {
|
||||||
|
|
||||||
#[rpc(meta, name = "sendTransaction")]
|
#[rpc(meta, name = "sendTransaction")]
|
||||||
fn send_transaction(&self, Self::Metadata, Vec<u8>) -> Result<String>;
|
fn send_transaction(&self, Self::Metadata, Vec<u8>) -> Result<String>;
|
||||||
|
|
||||||
#[rpc(meta, name = "startSubscriptionChannel")]
|
|
||||||
fn start_subscription_channel(&self, Self::Metadata) -> Result<SubscriptionResponse>;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,22 +205,6 @@ impl RpcSol for RpcSolImpl {
|
||||||
})?;
|
})?;
|
||||||
Ok(bs58::encode(tx.signature).into_string())
|
Ok(bs58::encode(tx.signature).into_string())
|
||||||
}
|
}
|
||||||
fn start_subscription_channel(&self, meta: Self::Metadata) -> Result<SubscriptionResponse> {
|
|
||||||
let port: u16 = find_available_port_in_range(FULLNODE_PORT_RANGE).map_err(|_| Error {
|
|
||||||
code: ErrorCode::InternalError,
|
|
||||||
message: "No available port in range".into(),
|
|
||||||
data: None,
|
|
||||||
})?;
|
|
||||||
let mut pubsub_addr = meta.rpc_addr;
|
|
||||||
pubsub_addr.set_port(port);
|
|
||||||
let pubkey = Keypair::new().pubkey();
|
|
||||||
let _pubsub_service =
|
|
||||||
PubSubService::new(&meta.request_processor.bank, pubsub_addr, pubkey, meta.exit);
|
|
||||||
Ok(SubscriptionResponse {
|
|
||||||
port,
|
|
||||||
path: pubkey.to_string(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct JsonRpcRequestProcessor {
|
pub struct JsonRpcRequestProcessor {
|
||||||
|
@ -264,31 +239,6 @@ impl JsonRpcRequestProcessor {
|
||||||
fn get_transaction_count(&self) -> Result<u64> {
|
fn get_transaction_count(&self) -> Result<u64> {
|
||||||
Ok(self.bank.transaction_count() as u64)
|
Ok(self.bank.transaction_count() as u64)
|
||||||
}
|
}
|
||||||
pub fn add_account_subscription(
|
|
||||||
&self,
|
|
||||||
bank_sub_id: Pubkey,
|
|
||||||
pubkey: Pubkey,
|
|
||||||
sink: Sink<Account>,
|
|
||||||
) {
|
|
||||||
self.bank
|
|
||||||
.add_account_subscription(bank_sub_id, pubkey, sink);
|
|
||||||
}
|
|
||||||
pub fn remove_account_subscription(&self, bank_sub_id: &Pubkey, pubkey: &Pubkey) {
|
|
||||||
self.bank.remove_account_subscription(bank_sub_id, pubkey);
|
|
||||||
}
|
|
||||||
pub fn add_signature_subscription(
|
|
||||||
&self,
|
|
||||||
bank_sub_id: Pubkey,
|
|
||||||
signature: Signature,
|
|
||||||
sink: Sink<RpcSignatureStatus>,
|
|
||||||
) {
|
|
||||||
self.bank
|
|
||||||
.add_signature_subscription(bank_sub_id, signature, sink);
|
|
||||||
}
|
|
||||||
pub fn remove_signature_subscription(&self, bank_sub_id: &Pubkey, signature: &Signature) {
|
|
||||||
self.bank
|
|
||||||
.remove_signature_subscription(bank_sub_id, signature);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
|
fn get_leader_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> {
|
||||||
|
|
|
@ -5,10 +5,10 @@ use bs58;
|
||||||
use jsonrpc_core::futures::Future;
|
use jsonrpc_core::futures::Future;
|
||||||
use jsonrpc_core::*;
|
use jsonrpc_core::*;
|
||||||
use jsonrpc_macros::pubsub;
|
use jsonrpc_macros::pubsub;
|
||||||
use jsonrpc_pubsub::{PubSubHandler, PubSubMetadata, Session, SubscriptionId};
|
use jsonrpc_pubsub::{PubSubHandler, Session, SubscriptionId};
|
||||||
use jsonrpc_ws_server::ws;
|
|
||||||
use jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder};
|
use jsonrpc_ws_server::{RequestContext, Sender, ServerBuilder};
|
||||||
use rpc::{JsonRpcRequestProcessor, RpcSignatureStatus};
|
use rpc::{JsonRpcRequestProcessor, RpcSignatureStatus};
|
||||||
|
use service::Service;
|
||||||
use signature::{Keypair, KeypairUtil, Signature};
|
use signature::{Keypair, KeypairUtil, Signature};
|
||||||
use solana_program_interface::account::Account;
|
use solana_program_interface::account::Account;
|
||||||
use solana_program_interface::pubkey::Pubkey;
|
use solana_program_interface::pubkey::Pubkey;
|
||||||
|
@ -16,8 +16,8 @@ use std::collections::HashMap;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::{atomic, Arc, Mutex, RwLock};
|
use std::sync::{atomic, Arc, RwLock};
|
||||||
use std::thread::{sleep, Builder, JoinHandle};
|
use std::thread::{self, sleep, Builder, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub enum ClientState {
|
pub enum ClientState {
|
||||||
|
@ -25,87 +25,59 @@ pub enum ClientState {
|
||||||
Init(Sender),
|
Init(Sender),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
pub struct PubSubService {
|
||||||
pub struct SubscriptionResponse {
|
thread_hdl: JoinHandle<()>,
|
||||||
pub port: u16,
|
|
||||||
pub path: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PubSubService {
|
impl Service for PubSubService {
|
||||||
_thread_hdl: JoinHandle<()>,
|
type JoinReturnType = ();
|
||||||
|
|
||||||
|
fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread_hdl.join()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PubSubService {
|
impl PubSubService {
|
||||||
pub fn new(
|
pub fn new(bank: &Arc<Bank>, pubsub_addr: SocketAddr, exit: Arc<AtomicBool>) -> Self {
|
||||||
bank: &Arc<Bank>,
|
let rpc = RpcSolPubSubImpl::new(JsonRpcRequestProcessor::new(bank.clone()), bank.clone());
|
||||||
pubsub_addr: SocketAddr,
|
let thread_hdl = Builder::new()
|
||||||
path: Pubkey,
|
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
) -> Self {
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(bank.clone());
|
|
||||||
let status = Arc::new(Mutex::new(ClientState::Uninitialized));
|
|
||||||
let client_status = status.clone();
|
|
||||||
let server_bank = bank.clone();
|
|
||||||
let _thread_hdl = Builder::new()
|
|
||||||
.name("solana-pubsub".to_string())
|
.name("solana-pubsub".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
let mut io = PubSubHandler::default();
|
let mut io = PubSubHandler::default();
|
||||||
let rpc = RpcSolPubSubImpl::default();
|
|
||||||
let account_subs = rpc.account_subscriptions.clone();
|
|
||||||
let signature_subs = rpc.signature_subscriptions.clone();
|
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
|
|
||||||
let server = ServerBuilder::with_meta_extractor(io, move |context: &RequestContext|
|
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
|
||||||
{
|
info!("New pubsub connection");
|
||||||
*client_status.lock().unwrap() = ClientState::Init(context.out.clone());
|
let session = Arc::new(Session::new(context.sender().clone()));
|
||||||
Meta {
|
session.on_drop(Box::new(|| {
|
||||||
request_processor: request_processor.clone(),
|
info!("Pubsub connection dropped");
|
||||||
session: Arc::new(Session::new(context.sender().clone())),
|
// Following should not be required as jsonrpc_pubsub will
|
||||||
}
|
// unsubscribe automatically once the websocket is dropped ...
|
||||||
})
|
/*
|
||||||
.request_middleware(move |req: &ws::Request|
|
for (_, (bank_sub_id, pubkey)) in self.account_subscriptions.read().unwrap().iter() {
|
||||||
if req.resource() != format!("/{}", path.to_string()) {
|
server_bank.remove_account_subscription(bank_sub_id, pubkey);
|
||||||
Some(ws::Response::new(403, "Client path incorrect or not provided"))
|
}
|
||||||
} else {
|
for (_, (bank_sub_id, signature)) in self.signature_subscriptions.read().unwrap().iter() {
|
||||||
None
|
server_bank.remove_signature_subscription(bank_sub_id, signature);
|
||||||
})
|
}
|
||||||
.start(&pubsub_addr);
|
*/
|
||||||
|
}));
|
||||||
|
session
|
||||||
|
})
|
||||||
|
.start(&pubsub_addr);
|
||||||
|
|
||||||
if server.is_err() {
|
if server.is_err() {
|
||||||
warn!("Pubsub service unavailable: unable to bind to port {}. \nMake sure this port is not already in use by another application", pubsub_addr.port());
|
warn!("Pubsub service unavailable: unable to bind to port {}. \nMake sure this port is not already in use by another application", pubsub_addr.port());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while !exit.load(Ordering::Relaxed) {
|
while !exit.load(Ordering::Relaxed) {
|
||||||
if let ClientState::Init(ref mut sender) = *status.lock().unwrap() {
|
|
||||||
if sender.check_active().is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sleep(Duration::from_millis(100));
|
sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
for (_, (bank_sub_id, pubkey)) in account_subs.read().unwrap().iter() {
|
|
||||||
server_bank.remove_account_subscription(bank_sub_id, pubkey);
|
|
||||||
}
|
|
||||||
for (_, (bank_sub_id, signature)) in signature_subs.read().unwrap().iter() {
|
|
||||||
server_bank.remove_signature_subscription(bank_sub_id, signature);
|
|
||||||
}
|
|
||||||
server.unwrap().close();
|
server.unwrap().close();
|
||||||
()
|
()
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
PubSubService { _thread_hdl }
|
PubSubService { thread_hdl }
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Meta {
|
|
||||||
pub request_processor: JsonRpcRequestProcessor,
|
|
||||||
pub session: Arc<Session>,
|
|
||||||
}
|
|
||||||
impl Metadata for Meta {}
|
|
||||||
impl PubSubMetadata for Meta {
|
|
||||||
fn session(&self) -> Option<Arc<Session>> {
|
|
||||||
Some(self.session.clone())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,21 +108,36 @@ build_rpc_trait! {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct RpcSolPubSubImpl {
|
struct RpcSolPubSubImpl {
|
||||||
uid: atomic::AtomicUsize,
|
uid: Arc<atomic::AtomicUsize>,
|
||||||
|
request_processor: JsonRpcRequestProcessor,
|
||||||
|
bank: Arc<Bank>,
|
||||||
account_subscriptions: Arc<RwLock<HashMap<SubscriptionId, (Pubkey, Pubkey)>>>,
|
account_subscriptions: Arc<RwLock<HashMap<SubscriptionId, (Pubkey, Pubkey)>>>,
|
||||||
signature_subscriptions: Arc<RwLock<HashMap<SubscriptionId, (Pubkey, Signature)>>>,
|
signature_subscriptions: Arc<RwLock<HashMap<SubscriptionId, (Pubkey, Signature)>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl RpcSolPubSubImpl {
|
||||||
|
fn new(request_processor: JsonRpcRequestProcessor, bank: Arc<Bank>) -> Self {
|
||||||
|
RpcSolPubSubImpl {
|
||||||
|
uid: Default::default(),
|
||||||
|
request_processor,
|
||||||
|
bank,
|
||||||
|
account_subscriptions: Default::default(),
|
||||||
|
signature_subscriptions: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl RpcSolPubSub for RpcSolPubSubImpl {
|
impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||||
type Metadata = Meta;
|
type Metadata = Arc<Session>;
|
||||||
|
|
||||||
fn account_subscribe(
|
fn account_subscribe(
|
||||||
&self,
|
&self,
|
||||||
meta: Self::Metadata,
|
_meta: Self::Metadata,
|
||||||
subscriber: pubsub::Subscriber<Account>,
|
subscriber: pubsub::Subscriber<Account>,
|
||||||
pubkey_str: String,
|
pubkey_str: String,
|
||||||
) {
|
) {
|
||||||
|
info!("account_subscribe");
|
||||||
let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap();
|
let pubkey_vec = bs58::decode(pubkey_str).into_vec().unwrap();
|
||||||
if pubkey_vec.len() != mem::size_of::<Pubkey>() {
|
if pubkey_vec.len() != mem::size_of::<Pubkey>() {
|
||||||
subscriber
|
subscriber
|
||||||
|
@ -172,15 +159,15 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(sub_id.clone(), (bank_sub_id, pubkey));
|
.insert(sub_id.clone(), (bank_sub_id, pubkey));
|
||||||
|
|
||||||
meta.request_processor
|
self.bank
|
||||||
.add_account_subscription(bank_sub_id, pubkey, sink);
|
.add_account_subscription(bank_sub_id, pubkey, sink);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn account_unsubscribe(&self, meta: Self::Metadata, id: SubscriptionId) -> Result<bool> {
|
fn account_unsubscribe(&self, _meta: Self::Metadata, id: SubscriptionId) -> Result<bool> {
|
||||||
|
info!("account_unsubscribe");
|
||||||
if let Some((bank_sub_id, pubkey)) = self.account_subscriptions.write().unwrap().remove(&id)
|
if let Some((bank_sub_id, pubkey)) = self.account_subscriptions.write().unwrap().remove(&id)
|
||||||
{
|
{
|
||||||
meta.request_processor
|
self.bank.remove_account_subscription(&bank_sub_id, &pubkey);
|
||||||
.remove_account_subscription(&bank_sub_id, &pubkey);
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
Err(Error {
|
Err(Error {
|
||||||
|
@ -193,10 +180,11 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||||
|
|
||||||
fn signature_subscribe(
|
fn signature_subscribe(
|
||||||
&self,
|
&self,
|
||||||
meta: Self::Metadata,
|
_meta: Self::Metadata,
|
||||||
subscriber: pubsub::Subscriber<RpcSignatureStatus>,
|
subscriber: pubsub::Subscriber<RpcSignatureStatus>,
|
||||||
signature_str: String,
|
signature_str: String,
|
||||||
) {
|
) {
|
||||||
|
info!("signature_subscribe");
|
||||||
let signature_vec = bs58::decode(signature_str).into_vec().unwrap();
|
let signature_vec = bs58::decode(signature_str).into_vec().unwrap();
|
||||||
if signature_vec.len() != mem::size_of::<Signature>() {
|
if signature_vec.len() != mem::size_of::<Signature>() {
|
||||||
subscriber
|
subscriber
|
||||||
|
@ -218,7 +206,7 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(sub_id.clone(), (bank_sub_id, signature));
|
.insert(sub_id.clone(), (bank_sub_id, signature));
|
||||||
|
|
||||||
match meta.request_processor.get_signature_status(signature) {
|
match self.request_processor.get_signature_status(signature) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
sink.notify(Ok(RpcSignatureStatus::Confirmed))
|
sink.notify(Ok(RpcSignatureStatus::Confirmed))
|
||||||
.wait()
|
.wait()
|
||||||
|
@ -229,17 +217,18 @@ impl RpcSolPubSub for RpcSolPubSubImpl {
|
||||||
.remove(&sub_id);
|
.remove(&sub_id);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
meta.request_processor
|
self.bank
|
||||||
.add_signature_subscription(bank_sub_id, signature, sink);
|
.add_signature_subscription(bank_sub_id, signature, sink);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signature_unsubscribe(&self, meta: Self::Metadata, id: SubscriptionId) -> Result<bool> {
|
fn signature_unsubscribe(&self, _meta: Self::Metadata, id: SubscriptionId) -> Result<bool> {
|
||||||
|
info!("signature_unsubscribe");
|
||||||
if let Some((bank_sub_id, signature)) =
|
if let Some((bank_sub_id, signature)) =
|
||||||
self.signature_subscriptions.write().unwrap().remove(&id)
|
self.signature_subscriptions.write().unwrap().remove(&id)
|
||||||
{
|
{
|
||||||
meta.request_processor
|
self.bank
|
||||||
.remove_signature_subscription(&bank_sub_id, &signature);
|
.remove_signature_subscription(&bank_sub_id, &signature);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
|
@ -270,10 +259,9 @@ mod tests {
|
||||||
let alice = Mint::new(10_000);
|
let alice = Mint::new(10_000);
|
||||||
let bank = Bank::new(&alice);
|
let bank = Bank::new(&alice);
|
||||||
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
|
||||||
let pubkey = Keypair::new().pubkey();
|
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, pubkey, exit);
|
let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, exit);
|
||||||
let thread = pubsub_service._thread_hdl.thread();
|
let thread = pubsub_service.thread_hdl.thread();
|
||||||
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
assert_eq!(thread.name().unwrap(), "solana-pubsub");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,17 +274,15 @@ mod tests {
|
||||||
let arc_bank = Arc::new(bank);
|
let arc_bank = Arc::new(bank);
|
||||||
let last_id = arc_bank.last_id();
|
let last_id = arc_bank.last_id();
|
||||||
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone());
|
|
||||||
let (sender, mut receiver) = mpsc::channel(1);
|
let (sender, mut receiver) = mpsc::channel(1);
|
||||||
let session = Arc::new(Session::new(sender));
|
let session = Arc::new(Session::new(sender));
|
||||||
|
|
||||||
let mut io = PubSubHandler::default();
|
let mut io = PubSubHandler::default();
|
||||||
let rpc = RpcSolPubSubImpl::default();
|
let rpc = RpcSolPubSubImpl::new(
|
||||||
|
JsonRpcRequestProcessor::new(arc_bank.clone()),
|
||||||
|
arc_bank.clone(),
|
||||||
|
);
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
let meta = Meta {
|
|
||||||
request_processor,
|
|
||||||
session,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Test signature subscription
|
// Test signature subscription
|
||||||
let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0);
|
let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0);
|
||||||
|
@ -305,7 +291,7 @@ mod tests {
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#,
|
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#,
|
||||||
tx.signature.to_string()
|
tx.signature.to_string()
|
||||||
);
|
);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
@ -318,7 +304,7 @@ mod tests {
|
||||||
let req = format!(
|
let req = format!(
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["a1b2c3"]}}"#
|
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["a1b2c3"]}}"#
|
||||||
);
|
);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid signature provided"}},"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid signature provided"}},"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
@ -346,7 +332,7 @@ mod tests {
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#,
|
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#,
|
||||||
tx.signature.to_string()
|
tx.signature.to_string()
|
||||||
);
|
);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","result":1,"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
@ -364,28 +350,26 @@ mod tests {
|
||||||
let arc_bank = Arc::new(bank);
|
let arc_bank = Arc::new(bank);
|
||||||
let last_id = arc_bank.last_id();
|
let last_id = arc_bank.last_id();
|
||||||
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(arc_bank);
|
|
||||||
let (sender, _receiver) = mpsc::channel(1);
|
let (sender, _receiver) = mpsc::channel(1);
|
||||||
let session = Arc::new(Session::new(sender));
|
let session = Arc::new(Session::new(sender));
|
||||||
|
|
||||||
let mut io = PubSubHandler::default();
|
let mut io = PubSubHandler::default();
|
||||||
let rpc = RpcSolPubSubImpl::default();
|
let rpc = RpcSolPubSubImpl::new(
|
||||||
|
JsonRpcRequestProcessor::new(arc_bank.clone()),
|
||||||
|
arc_bank.clone(),
|
||||||
|
);
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
let meta = Meta {
|
|
||||||
request_processor,
|
|
||||||
session: session.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0);
|
let tx = Transaction::system_move(&alice.keypair(), bob_pubkey, 20, last_id, 0);
|
||||||
let req = format!(
|
let req = format!(
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#,
|
r#"{{"jsonrpc":"2.0","id":1,"method":"signatureSubscribe","params":["{}"]}}"#,
|
||||||
tx.signature.to_string()
|
tx.signature.to_string()
|
||||||
);
|
);
|
||||||
let _res = io.handle_request_sync(&req, meta.clone());
|
let _res = io.handle_request_sync(&req, session.clone());
|
||||||
|
|
||||||
let req =
|
let req =
|
||||||
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[0]}}"#);
|
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[0]}}"#);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
|
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
|
@ -398,7 +382,7 @@ mod tests {
|
||||||
// Test bad parameter
|
// Test bad parameter
|
||||||
let req =
|
let req =
|
||||||
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#);
|
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"signatureUnsubscribe","params":[1]}}"#);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
@ -420,24 +404,22 @@ mod tests {
|
||||||
let arc_bank = Arc::new(bank);
|
let arc_bank = Arc::new(bank);
|
||||||
let last_id = arc_bank.last_id();
|
let last_id = arc_bank.last_id();
|
||||||
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(arc_bank.clone());
|
|
||||||
let (sender, mut receiver) = mpsc::channel(1);
|
let (sender, mut receiver) = mpsc::channel(1);
|
||||||
let session = Arc::new(Session::new(sender));
|
let session = Arc::new(Session::new(sender));
|
||||||
|
|
||||||
let mut io = PubSubHandler::default();
|
let mut io = PubSubHandler::default();
|
||||||
let rpc = RpcSolPubSubImpl::default();
|
let rpc = RpcSolPubSubImpl::new(
|
||||||
|
JsonRpcRequestProcessor::new(arc_bank.clone()),
|
||||||
|
arc_bank.clone(),
|
||||||
|
);
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
let meta = Meta {
|
|
||||||
request_processor,
|
|
||||||
session,
|
|
||||||
};
|
|
||||||
|
|
||||||
let req = format!(
|
let req = format!(
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#,
|
r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#,
|
||||||
contract_state.pubkey().to_string()
|
contract_state.pubkey().to_string()
|
||||||
);
|
);
|
||||||
|
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","result":0,"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
@ -450,7 +432,7 @@ mod tests {
|
||||||
let req = format!(
|
let req = format!(
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["a1b2c3"]}}"#
|
r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["a1b2c3"]}}"#
|
||||||
);
|
);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid pubkey provided"}},"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Invalid pubkey provided"}},"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
@ -481,6 +463,7 @@ mod tests {
|
||||||
budget_program_id,
|
budget_program_id,
|
||||||
0,
|
0,
|
||||||
);
|
);
|
||||||
|
|
||||||
arc_bank
|
arc_bank
|
||||||
.process_transaction(&tx)
|
.process_transaction(&tx)
|
||||||
.expect("process transaction");
|
.expect("process transaction");
|
||||||
|
@ -488,10 +471,12 @@ mod tests {
|
||||||
// Test signature confirmation notification #1
|
// Test signature confirmation notification #1
|
||||||
let string = receiver.poll();
|
let string = receiver.poll();
|
||||||
assert!(string.is_ok());
|
assert!(string.is_ok());
|
||||||
|
|
||||||
let expected_userdata = arc_bank
|
let expected_userdata = arc_bank
|
||||||
.get_account(&contract_state.pubkey())
|
.get_account(&contract_state.pubkey())
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.userdata;
|
.userdata;
|
||||||
|
|
||||||
let expected = json!({
|
let expected = json!({
|
||||||
"jsonrpc": "2.0",
|
"jsonrpc": "2.0",
|
||||||
"method": "accountNotification",
|
"method": "accountNotification",
|
||||||
|
@ -593,27 +578,26 @@ mod tests {
|
||||||
let bank = Bank::new(&alice);
|
let bank = Bank::new(&alice);
|
||||||
let arc_bank = Arc::new(bank);
|
let arc_bank = Arc::new(bank);
|
||||||
|
|
||||||
let request_processor = JsonRpcRequestProcessor::new(arc_bank);
|
|
||||||
let (sender, _receiver) = mpsc::channel(1);
|
let (sender, _receiver) = mpsc::channel(1);
|
||||||
let session = Arc::new(Session::new(sender));
|
let session = Arc::new(Session::new(sender));
|
||||||
|
|
||||||
let mut io = PubSubHandler::default();
|
let mut io = PubSubHandler::default();
|
||||||
let rpc = RpcSolPubSubImpl::default();
|
let rpc = RpcSolPubSubImpl::new(
|
||||||
|
JsonRpcRequestProcessor::new(arc_bank.clone()),
|
||||||
|
arc_bank.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
io.extend_with(rpc.to_delegate());
|
io.extend_with(rpc.to_delegate());
|
||||||
let meta = Meta {
|
|
||||||
request_processor,
|
|
||||||
session: session.clone(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let req = format!(
|
let req = format!(
|
||||||
r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#,
|
r#"{{"jsonrpc":"2.0","id":1,"method":"accountSubscribe","params":["{}"]}}"#,
|
||||||
bob_pubkey.to_string()
|
bob_pubkey.to_string()
|
||||||
);
|
);
|
||||||
let _res = io.handle_request_sync(&req, meta.clone());
|
let _res = io.handle_request_sync(&req, session.clone());
|
||||||
|
|
||||||
let req =
|
let req =
|
||||||
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[0]}}"#);
|
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[0]}}"#);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
|
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","result":true,"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
|
@ -626,7 +610,7 @@ mod tests {
|
||||||
// Test bad parameter
|
// Test bad parameter
|
||||||
let req =
|
let req =
|
||||||
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#);
|
format!(r#"{{"jsonrpc":"2.0","id":1,"method":"accountUnsubscribe","params":[1]}}"#);
|
||||||
let res = io.handle_request_sync(&req, meta.clone());
|
let res = io.handle_request_sync(&req, session.clone());
|
||||||
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#);
|
let expected = format!(r#"{{"jsonrpc":"2.0","error":{{"code":-32602,"message":"Invalid Request: Subscription id does not exist"}},"id":1}}"#);
|
||||||
let expected: Response =
|
let expected: Response =
|
||||||
serde_json::from_str(&expected).expect("expected response deserialization");
|
serde_json::from_str(&expected).expect("expected response deserialization");
|
||||||
|
|
Loading…
Reference in New Issue