2023-07-20 04:45:54 -07:00
|
|
|
pub mod rpc_tester;
|
|
|
|
|
2023-01-09 22:40:43 -08:00
|
|
|
use std::time::Duration;
|
2022-11-12 05:32:01 -08:00
|
|
|
|
2023-08-31 03:34:13 -07:00
|
|
|
use anyhow::bail;
|
2022-11-30 07:56:41 -08:00
|
|
|
use clap::Parser;
|
2023-02-13 10:58:30 -08:00
|
|
|
use dotenv::dotenv;
|
2023-08-31 03:34:13 -07:00
|
|
|
use lite_rpc::postgres::Postgres;
|
|
|
|
use lite_rpc::service_spawner::ServiceSpawner;
|
2022-12-21 05:31:43 -08:00
|
|
|
use lite_rpc::{bridge::LiteBridge, cli::Args};
|
2023-08-31 05:26:46 -07:00
|
|
|
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION};
|
2023-07-20 04:45:54 -07:00
|
|
|
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
|
2023-08-31 04:56:33 -07:00
|
|
|
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter;
|
|
|
|
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
|
|
|
|
use solana_lite_rpc_core::block_information_store::{BlockInformation, BlockInformationStore};
|
|
|
|
use solana_lite_rpc_core::cluster_info::ClusterInfo;
|
|
|
|
use solana_lite_rpc_core::data_cache::{DataCache, SlotCache};
|
2023-09-02 09:47:29 -07:00
|
|
|
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_lite_rpc_core::notifications::NotificationSender;
|
|
|
|
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
|
|
|
|
use solana_lite_rpc_core::streams::BlockStream;
|
|
|
|
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakes;
|
|
|
|
use solana_lite_rpc_core::structures::processed_block::ProcessedBlock;
|
|
|
|
use solana_lite_rpc_core::subscription_handler::SubscriptionHandler;
|
|
|
|
use solana_lite_rpc_core::tx_store::TxStore;
|
|
|
|
use solana_lite_rpc_core::AnyhowJoinHandle;
|
|
|
|
use solana_lite_rpc_services::data_caching_service::DataCachingService;
|
2023-08-02 06:22:59 -07:00
|
|
|
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
|
|
|
|
use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
|
|
|
|
use solana_lite_rpc_services::tx_sender::TxSender;
|
|
|
|
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
|
|
|
|
use solana_sdk::commitment_config::CommitmentConfig;
|
2023-09-02 09:47:29 -07:00
|
|
|
use solana_sdk::signature::Keypair;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_sdk::signer::Signer;
|
2023-02-13 10:58:30 -08:00
|
|
|
use std::env;
|
2023-09-06 09:28:21 -07:00
|
|
|
use std::net::{SocketAddr, ToSocketAddrs};
|
2023-07-21 09:08:32 -07:00
|
|
|
use std::sync::Arc;
|
2023-08-31 03:34:13 -07:00
|
|
|
use tokio::sync::mpsc;
|
2023-02-13 10:58:30 -08:00
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
use crate::rpc_tester::RpcTester;
|
|
|
|
|
2023-08-31 03:34:13 -07:00
|
|
|
async fn get_latest_block(
|
|
|
|
mut block_stream: BlockStream,
|
|
|
|
commitment_config: CommitmentConfig,
|
|
|
|
) -> ProcessedBlock {
|
|
|
|
while let Ok(block) = block_stream.recv().await {
|
|
|
|
if block.commitment_config == commitment_config {
|
|
|
|
return block;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
panic!("Did not recv blocks");
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn start_postgres(
|
|
|
|
enable: bool,
|
|
|
|
) -> anyhow::Result<(Option<NotificationSender>, AnyhowJoinHandle)> {
|
|
|
|
if !enable {
|
|
|
|
return Ok((
|
|
|
|
None,
|
|
|
|
tokio::spawn(async {
|
|
|
|
std::future::pending::<()>().await;
|
|
|
|
unreachable!()
|
|
|
|
}),
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let (postgres_send, postgres_recv) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
let postgres = Postgres::new().await?.start(postgres_recv);
|
|
|
|
|
|
|
|
Ok((Some(postgres_send), postgres))
|
|
|
|
}
|
|
|
|
|
2023-09-07 07:46:14 -07:00
|
|
|
pub async fn start_lite_rpc(args: Args, rpc_client: Arc<RpcClient>) -> anyhow::Result<()> {
|
2022-12-16 18:35:49 -08:00
|
|
|
let Args {
|
2023-01-05 02:54:00 -08:00
|
|
|
lite_rpc_ws_addr,
|
|
|
|
lite_rpc_http_addr,
|
2023-01-10 07:45:30 -08:00
|
|
|
fanout_size,
|
2023-01-31 05:30:52 -08:00
|
|
|
enable_postgres,
|
2023-02-04 03:45:20 -08:00
|
|
|
prometheus_addr,
|
2023-02-08 13:18:09 -08:00
|
|
|
identity_keypair,
|
2023-04-18 05:22:51 -07:00
|
|
|
maximum_retries_per_tx,
|
|
|
|
transaction_retry_after_secs,
|
2023-09-02 05:12:49 -07:00
|
|
|
quic_proxy_addr,
|
2023-08-31 04:56:33 -07:00
|
|
|
use_grpc,
|
|
|
|
grpc_addr,
|
2023-08-31 03:34:13 -07:00
|
|
|
..
|
2023-07-20 04:45:54 -07:00
|
|
|
} = args;
|
2023-05-09 07:52:51 -07:00
|
|
|
|
2023-09-02 09:47:29 -07:00
|
|
|
let validator_identity = Arc::new(
|
|
|
|
load_identity_keypair(&identity_keypair)
|
|
|
|
.await
|
|
|
|
.unwrap_or_else(Keypair::new),
|
|
|
|
);
|
2023-05-13 06:52:43 -07:00
|
|
|
|
2023-05-09 07:52:51 -07:00
|
|
|
let retry_after = Duration::from_secs(transaction_retry_after_secs);
|
2023-06-11 17:15:40 -07:00
|
|
|
|
2023-09-02 05:12:49 -07:00
|
|
|
let tpu_connection_path = configure_tpu_connection_path(quic_proxy_addr);
|
2023-07-26 14:33:49 -07:00
|
|
|
|
2023-08-31 05:26:46 -07:00
|
|
|
let (subscriptions, cluster_endpoint_tasks) = if use_grpc {
|
2023-08-31 04:56:33 -07:00
|
|
|
create_grpc_subscription(rpc_client.clone(), grpc_addr, GRPC_VERSION.to_string())?
|
|
|
|
} else {
|
|
|
|
create_json_rpc_polling_subscription(rpc_client.clone())?
|
|
|
|
};
|
2023-08-31 03:34:13 -07:00
|
|
|
let EndpointStreaming {
|
|
|
|
blocks_notifier,
|
|
|
|
cluster_info_notifier,
|
|
|
|
slot_notifier,
|
|
|
|
vote_account_notifier,
|
|
|
|
} = subscriptions;
|
|
|
|
let finalized_block =
|
|
|
|
get_latest_block(blocks_notifier.resubscribe(), CommitmentConfig::finalized()).await;
|
|
|
|
|
|
|
|
let block_store = BlockInformationStore::new(BlockInformation::from_block(&finalized_block));
|
|
|
|
let data_cache = DataCache {
|
|
|
|
block_store,
|
|
|
|
cluster_info: ClusterInfo::default(),
|
|
|
|
identity_stakes: IdentityStakes::new(validator_identity.pubkey()),
|
|
|
|
slot_cache: SlotCache::new(finalized_block.slot),
|
|
|
|
tx_subs: SubscriptionHandler::default(),
|
|
|
|
txs: TxStore::default(),
|
|
|
|
};
|
|
|
|
|
|
|
|
let lata_cache_service = DataCachingService {
|
|
|
|
data_cache: data_cache.clone(),
|
|
|
|
clean_duration: Duration::from_secs(120),
|
|
|
|
};
|
2023-08-31 04:56:33 -07:00
|
|
|
|
|
|
|
// to avoid laggin we resubscribe to block notification
|
2023-08-31 03:34:13 -07:00
|
|
|
let data_caching_service = lata_cache_service.listen(
|
2023-08-31 04:56:33 -07:00
|
|
|
blocks_notifier.resubscribe(),
|
2023-08-31 03:34:13 -07:00
|
|
|
slot_notifier.resubscribe(),
|
|
|
|
cluster_info_notifier,
|
|
|
|
vote_account_notifier,
|
|
|
|
);
|
2023-08-31 04:56:33 -07:00
|
|
|
drop(blocks_notifier);
|
2023-08-31 03:34:13 -07:00
|
|
|
|
|
|
|
let (notification_channel, postgres) = start_postgres(enable_postgres).await?;
|
|
|
|
|
|
|
|
let tpu_config = TpuServiceConfig {
|
|
|
|
fanout_slots: fanout_size,
|
|
|
|
number_of_leaders_to_cache: 1024,
|
|
|
|
clusterinfo_refresh_time: Duration::from_secs(60 * 60),
|
|
|
|
leader_schedule_update_frequency: Duration::from_secs(10),
|
|
|
|
maximum_transaction_in_queue: 20000,
|
|
|
|
maximum_number_of_errors: 10,
|
|
|
|
quic_connection_params: QuicConnectionParameters {
|
|
|
|
connection_timeout: Duration::from_secs(1),
|
|
|
|
connection_retry_count: 10,
|
|
|
|
finalize_timeout: Duration::from_millis(200),
|
2023-09-08 03:56:16 -07:00
|
|
|
max_number_of_connections: 8,
|
2023-08-31 03:34:13 -07:00
|
|
|
unistream_timeout: Duration::from_millis(500),
|
|
|
|
write_timeout: Duration::from_secs(1),
|
2023-09-08 03:56:16 -07:00
|
|
|
number_of_transactions_per_unistream: 1,
|
2023-08-31 03:34:13 -07:00
|
|
|
},
|
2023-08-02 06:22:59 -07:00
|
|
|
tpu_connection_path,
|
2023-08-31 03:34:13 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
let spawner = ServiceSpawner {
|
2023-06-11 17:15:40 -07:00
|
|
|
prometheus_addr,
|
2023-08-31 03:34:13 -07:00
|
|
|
data_cache: data_cache.clone(),
|
|
|
|
};
|
|
|
|
let leader_schedule = Arc::new(JsonRpcLeaderGetter::new(rpc_client.clone(), 1024, 128));
|
|
|
|
|
|
|
|
let tpu_service: TpuService = TpuService::new(
|
|
|
|
tpu_config,
|
|
|
|
validator_identity,
|
|
|
|
leader_schedule,
|
|
|
|
data_cache.clone(),
|
2023-07-20 04:45:54 -07:00
|
|
|
)
|
2023-08-31 03:34:13 -07:00
|
|
|
.await?;
|
|
|
|
let tx_sender = TxSender::new(data_cache.clone(), tpu_service.clone());
|
|
|
|
let tx_replayer =
|
|
|
|
TransactionReplayer::new(tpu_service.clone(), data_cache.txs.clone(), retry_after);
|
|
|
|
let (transaction_service, tx_service_jh) = spawner.spawn_tx_service(
|
|
|
|
tx_sender,
|
|
|
|
tx_replayer,
|
|
|
|
tpu_service,
|
2023-08-31 04:56:33 -07:00
|
|
|
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
|
2023-08-31 03:34:13 -07:00
|
|
|
notification_channel.clone(),
|
|
|
|
maximum_retries_per_tx,
|
2023-08-31 04:56:33 -07:00
|
|
|
slot_notifier.resubscribe(),
|
2023-08-31 03:34:13 -07:00
|
|
|
);
|
2023-08-31 04:56:33 -07:00
|
|
|
drop(slot_notifier);
|
2023-08-31 03:34:13 -07:00
|
|
|
|
|
|
|
let support_service = tokio::spawn(async move { spawner.spawn_support_services().await });
|
|
|
|
|
|
|
|
let bridge_service = tokio::spawn(
|
|
|
|
LiteBridge::new(rpc_client.clone(), data_cache.clone(), transaction_service)
|
|
|
|
.start(lite_rpc_http_addr, lite_rpc_ws_addr),
|
|
|
|
);
|
|
|
|
tokio::select! {
|
|
|
|
res = tx_service_jh => {
|
|
|
|
anyhow::bail!("Tx Services {res:?}")
|
|
|
|
}
|
|
|
|
res = support_service => {
|
|
|
|
anyhow::bail!("Support Services {res:?}")
|
|
|
|
}
|
|
|
|
res = bridge_service => {
|
|
|
|
anyhow::bail!("Server {res:?}")
|
|
|
|
}
|
|
|
|
res = postgres => {
|
|
|
|
anyhow::bail!("Postgres service {res:?}");
|
|
|
|
}
|
|
|
|
res = futures::future::select_all(data_caching_service) => {
|
|
|
|
anyhow::bail!("Data caching service failed {res:?}")
|
|
|
|
}
|
|
|
|
res = futures::future::select_all(cluster_endpoint_tasks) => {
|
|
|
|
anyhow::bail!("cluster endpoint failure {res:?}")
|
|
|
|
}
|
|
|
|
}
|
2023-07-20 04:45:54 -07:00
|
|
|
}
|
2022-12-16 18:35:49 -08:00
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
fn get_args() -> Args {
|
|
|
|
let mut args = Args::parse();
|
|
|
|
|
|
|
|
dotenv().ok();
|
|
|
|
|
|
|
|
args.enable_postgres = args.enable_postgres
|
|
|
|
|| if let Ok(enable_postgres_env_var) = env::var("PG_ENABLED") {
|
|
|
|
enable_postgres_env_var != "false"
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
};
|
|
|
|
|
|
|
|
args
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
|
|
|
|
pub async fn main() -> anyhow::Result<()> {
|
|
|
|
tracing_subscriber::fmt::init();
|
|
|
|
|
|
|
|
let args = get_args();
|
2023-06-23 00:16:57 -07:00
|
|
|
|
2022-12-16 18:35:49 -08:00
|
|
|
let ctrl_c_signal = tokio::signal::ctrl_c();
|
2023-09-07 07:46:14 -07:00
|
|
|
let Args { rpc_addr, .. } = &args;
|
|
|
|
// rpc client
|
|
|
|
let rpc_client = Arc::new(RpcClient::new(rpc_addr.clone()));
|
|
|
|
let rpc_tester = tokio::spawn(RpcTester::new(rpc_client.clone()).start());
|
2023-07-20 04:45:54 -07:00
|
|
|
|
2023-09-07 07:46:14 -07:00
|
|
|
let main = start_lite_rpc(args.clone(), rpc_client);
|
2022-12-16 18:35:49 -08:00
|
|
|
|
|
|
|
tokio::select! {
|
2023-07-20 04:45:54 -07:00
|
|
|
err = rpc_tester => {
|
2023-09-06 09:52:08 -07:00
|
|
|
log::error!("{err:?}");
|
|
|
|
Ok(())
|
2023-07-20 04:45:54 -07:00
|
|
|
}
|
|
|
|
res = main => {
|
|
|
|
// This should never happen
|
|
|
|
log::error!("Services quit unexpectedly {res:?}");
|
|
|
|
bail!("")
|
|
|
|
}
|
2023-01-31 09:02:50 -08:00
|
|
|
_ = ctrl_c_signal => {
|
2023-07-20 04:45:54 -07:00
|
|
|
log::info!("Received ctrl+c signal");
|
2023-06-11 17:46:50 -07:00
|
|
|
Ok(())
|
2022-12-16 18:35:49 -08:00
|
|
|
}
|
2022-11-25 01:22:12 -08:00
|
|
|
}
|
2022-12-07 07:05:18 -08:00
|
|
|
}
|
2023-08-02 04:32:01 -07:00
|
|
|
|
2023-09-02 05:12:49 -07:00
|
|
|
fn configure_tpu_connection_path(quic_proxy_addr: Option<String>) -> TpuConnectionPath {
|
|
|
|
match quic_proxy_addr {
|
2023-08-02 04:32:01 -07:00
|
|
|
None => TpuConnectionPath::QuicDirectPath,
|
2023-09-06 09:28:21 -07:00
|
|
|
Some(prox_address) => {
|
|
|
|
let proxy_socket_addr = parse_host_port_to_ipv4(prox_address.as_str()).unwrap();
|
|
|
|
TpuConnectionPath::QuicForwardProxyPath {
|
2023-09-06 09:30:47 -07:00
|
|
|
// e.g. "127.0.0.1:11111" or "localhost:11111"
|
2023-09-06 09:28:21 -07:00
|
|
|
forward_proxy_address: proxy_socket_addr,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn parse_host_port_to_ipv4(host_port: &str) -> Result<SocketAddr, String> {
|
|
|
|
let addrs: Vec<_> = host_port
|
|
|
|
.to_socket_addrs()
|
|
|
|
.map_err(|err| format!("Unable to resolve host {host_port}: {err}"))?
|
|
|
|
.filter(|addr| addr.is_ipv4())
|
|
|
|
.collect();
|
|
|
|
if addrs.is_empty() {
|
|
|
|
Err(format!("Unable to resolve host: {host_port}"))
|
|
|
|
} else if addrs.len() > 1 {
|
|
|
|
Err(format!("Multiple addresses resolved for host: {host_port}"))
|
|
|
|
} else {
|
|
|
|
Ok(addrs[0])
|
2023-08-02 04:32:01 -07:00
|
|
|
}
|
2023-08-02 06:22:59 -07:00
|
|
|
}
|