adding identity to the tpu client, connection cache

This commit is contained in:
Godmode Galactus 2023-02-08 22:18:09 +01:00
parent c92ce5074d
commit 62207bb2ab
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
8 changed files with 283 additions and 24 deletions

212
Cargo.lock generated
View File

@ -98,6 +98,15 @@ dependencies = [
"libc",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.69"
@ -634,6 +643,21 @@ dependencies = [
"inout",
]
[[package]]
name = "clap"
version = "2.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim 0.8.0",
"textwrap 0.11.0",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap"
version = "3.2.23"
@ -645,9 +669,9 @@ dependencies = [
"clap_lex 0.2.4",
"indexmap",
"once_cell",
"strsim",
"strsim 0.10.0",
"termcolor",
"textwrap",
"textwrap 0.16.0",
]
[[package]]
@ -661,7 +685,7 @@ dependencies = [
"clap_lex 0.3.1",
"is-terminal",
"once_cell",
"strsim",
"strsim 0.10.0",
"termcolor",
]
@ -996,7 +1020,7 @@ dependencies = [
"ident_case",
"proc-macro2 1.0.51",
"quote 1.0.23",
"strsim",
"strsim 0.10.0",
"syn 1.0.107",
]
@ -1070,6 +1094,18 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "dialoguer"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af3c796f3b0b408d9fd581611b47fa850821fcb84aa640b83a3c1a5be2d691f2"
dependencies = [
"console",
"shell-words",
"tempfile",
"zeroize",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -1226,6 +1262,18 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "enum_dispatch"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11f36e95862220b211a6e2aa5eca09b4fa391b13cd52ceb8035a24bf65a79de2"
dependencies = [
"once_cell",
"proc-macro2 1.0.51",
"quote 1.0.23",
"syn 1.0.107",
]
[[package]]
name = "env_logger"
version = "0.9.3"
@ -2205,6 +2253,7 @@ dependencies = [
"prometheus",
"serde",
"serde_json",
"solana-client",
"solana-pubsub-client",
"solana-quic-client",
"solana-rpc-client",
@ -3216,6 +3265,27 @@ dependencies = [
"winapi",
]
[[package]]
name = "rpassword"
version = "7.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6678cf63ab3491898c0d021b493c94c9b221d91295294a2a5746eacbe5928322"
dependencies = [
"libc",
"rtoolbox",
"winapi",
]
[[package]]
name = "rtoolbox"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "034e22c514f5c0cb8a10ff341b9b048b5ceb21591f31c8f44c43b960f9b3524a"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
@ -3571,6 +3641,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -3686,6 +3762,56 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-clap-utils"
version = "1.15.0"
source = "git+https://github.com/blockworks-foundation/solana?branch=lite_rpc#04daf93c153bdce7f461ad284843c21c63ca621f"
dependencies = [
"chrono",
"clap 2.34.0",
"rpassword",
"solana-perf",
"solana-remote-wallet",
"solana-sdk 1.15.0 (git+https://github.com/blockworks-foundation/solana?branch=lite_rpc)",
"thiserror",
"tiny-bip39",
"uriparse",
"url",
]
[[package]]
name = "solana-client"
version = "1.15.0"
source = "git+https://github.com/blockworks-foundation/solana?branch=lite_rpc#04daf93c153bdce7f461ad284843c21c63ca621f"
dependencies = [
"async-trait",
"bincode",
"enum_dispatch",
"futures",
"futures-util",
"indexmap",
"indicatif",
"log",
"quinn",
"rand 0.7.3",
"rayon",
"solana-measure",
"solana-metrics",
"solana-net-utils",
"solana-pubsub-client",
"solana-quic-client",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-rpc-client-nonce-utils",
"solana-sdk 1.15.0 (git+https://github.com/blockworks-foundation/solana?branch=lite_rpc)",
"solana-streamer",
"solana-thin-client",
"solana-tpu-client",
"solana-udp-client",
"thiserror",
"tokio",
]
[[package]]
name = "solana-config-program"
version = "1.15.0"
@ -4072,6 +4198,24 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "solana-remote-wallet"
version = "1.15.0"
source = "git+https://github.com/blockworks-foundation/solana?branch=lite_rpc#04daf93c153bdce7f461ad284843c21c63ca621f"
dependencies = [
"console",
"dialoguer",
"log",
"num-derive",
"num-traits",
"parking_lot",
"qstring",
"semver 1.0.16",
"solana-sdk 1.15.0 (git+https://github.com/blockworks-foundation/solana?branch=lite_rpc)",
"thiserror",
"uriparse",
]
[[package]]
name = "solana-rpc-client"
version = "1.15.0"
@ -4118,6 +4262,18 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-rpc-client-nonce-utils"
version = "1.15.0"
source = "git+https://github.com/blockworks-foundation/solana?branch=lite_rpc#04daf93c153bdce7f461ad284843c21c63ca621f"
dependencies = [
"clap 2.34.0",
"solana-clap-utils",
"solana-rpc-client",
"solana-sdk 1.15.0 (git+https://github.com/blockworks-foundation/solana?branch=lite_rpc)",
"thiserror",
]
[[package]]
name = "solana-sdk"
version = "1.15.0"
@ -4278,6 +4434,19 @@ dependencies = [
"x509-parser",
]
[[package]]
name = "solana-thin-client"
version = "1.15.0"
source = "git+https://github.com/blockworks-foundation/solana?branch=lite_rpc#04daf93c153bdce7f461ad284843c21c63ca621f"
dependencies = [
"bincode",
"log",
"solana-rpc-client",
"solana-rpc-client-api",
"solana-sdk 1.15.0 (git+https://github.com/blockworks-foundation/solana?branch=lite_rpc)",
"solana-tpu-client",
]
[[package]]
name = "solana-tpu-client"
version = "1.15.0"
@ -4327,6 +4496,20 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-udp-client"
version = "1.15.0"
source = "git+https://github.com/blockworks-foundation/solana?branch=lite_rpc#04daf93c153bdce7f461ad284843c21c63ca621f"
dependencies = [
"async-trait",
"solana-net-utils",
"solana-sdk 1.15.0 (git+https://github.com/blockworks-foundation/solana?branch=lite_rpc)",
"solana-streamer",
"solana-tpu-client",
"thiserror",
"tokio",
]
[[package]]
name = "solana-version"
version = "1.15.0"
@ -4497,6 +4680,12 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.10.0"
@ -4566,6 +4755,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "textwrap"
version = "0.16.0"
@ -5042,6 +5240,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.4"

View File

@ -21,6 +21,7 @@ solana-quic-client= { git = "https://github.com/blockworks-foundation/solana", b
solana-pubsub-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-transaction-status = { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-version= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
solana-client= { git = "https://github.com/blockworks-foundation/solana", branch="lite_rpc" }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.92"
tokio = { version = "1.25.0", features = ["full"]}

View File

@ -24,8 +24,9 @@ use solana_rpc_client_api::{
config::{RpcContextConfig, RpcRequestAirdropConfig, RpcSignatureStatusConfig},
response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
};
use solana_sdk::clock::MAX_RECENT_BLOCKHASHES;
use solana_sdk::{
commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey,
commitment_config::CommitmentConfig, hash::Hash, pubkey::Pubkey, signature::Keypair,
transaction::VersionedTransaction,
};
use solana_transaction_status::TransactionStatus;
@ -34,7 +35,6 @@ use tokio::{
sync::mpsc::{self, UnboundedSender},
task::JoinHandle,
};
use solana_sdk::clock::MAX_RECENT_BLOCKHASHES;
lazy_static::lazy_static! {
static ref RPC_SEND_TX: Counter =
@ -66,11 +66,16 @@ pub struct LiteBridge {
}
impl LiteBridge {
pub async fn new(rpc_url: String, ws_addr: String, fanout_slots: u64) -> anyhow::Result<Self> {
pub async fn new(
rpc_url: String,
ws_addr: String,
fanout_slots: u64,
identity: Keypair,
) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let tpu_manager =
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots).await?);
Arc::new(TpuManager::new(rpc_client.clone(), ws_addr, fanout_slots, identity).await?);
let tx_sender = TxSender::new(tpu_manager.clone());

View File

@ -33,4 +33,6 @@ pub struct Args {
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9091"))]
pub prometheus_addr: String,
#[arg(short = 'k', long, default_value_t = String::new())]
pub identity_keypair: String,
}

View File

@ -4,6 +4,7 @@ use anyhow::bail;
use clap::Parser;
use lite_rpc::{bridge::LiteBridge, cli::Args};
use log::info;
use solana_sdk::signature::Keypair;
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
@ -20,12 +21,23 @@ pub async fn main() -> anyhow::Result<()> {
fanout_size,
enable_postgres,
prometheus_addr,
identity_keypair,
} = Args::parse();
let identity = if identity_keypair.is_empty() {
Keypair::new()
} else {
let identity_file = tokio::fs::read_to_string(identity_keypair.as_str())
.await
.expect("Cannot find the identity file provided");
let identity_bytes: Vec<u8> = serde_json::from_str(&identity_file)?;
Keypair::from_bytes(identity_bytes.as_slice())?
};
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size).await?;
let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size, identity).await?;
let services = light_bridge
.start_services(

View File

@ -1,15 +1,26 @@
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
use std::{
net::{IpAddr, Ipv4Addr},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
use log::info;
use solana_quic_client::QuicPool;
use solana_quic_client::{QuicConfig, QuicPool};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_tpu_client::{nonblocking::tpu_client::TpuClient, tpu_client::TpuClientConfig};
use tokio::sync::{RwLock, RwLockReadGuard};
use solana_sdk::signature::Keypair;
use solana_tpu_client::{
nonblocking::tpu_client::TpuClient,
tpu_client::TpuClientConfig,
tpu_connection_cache::{NewTpuConfig, TpuConnectionCache},
};
use tokio::sync::RwLock;
pub type QuicTpuClient = TpuClient<QuicPool>;
pub type QuicConnectionCache = TpuConnectionCache<QuicPool>;
const TPU_CONNECTION_CACHE_SIZE: usize = 8;
#[derive(Clone)]
pub struct TpuManager {
@ -18,6 +29,7 @@ pub struct TpuManager {
tpu_client: Arc<RwLock<QuicTpuClient>>,
pub ws_addr: String,
fanout_slots: u64,
connection_cache: Arc<QuicConnectionCache>,
}
impl TpuManager {
@ -25,8 +37,23 @@ impl TpuManager {
rpc_client: Arc<RpcClient>,
ws_addr: String,
fanout_slots: u64,
identity: Keypair,
) -> anyhow::Result<Self> {
let tpu_client = Self::new_tpu_client(rpc_client.clone(), &ws_addr, fanout_slots).await?;
let mut tpu_config = QuicConfig::new().unwrap();
tpu_config
.update_client_certificate(&identity, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))
.unwrap();
let connection_cache =
QuicConnectionCache::new_with_config(TPU_CONNECTION_CACHE_SIZE, tpu_config);
let connection_cache = Arc::new(connection_cache);
let tpu_client = Self::new_tpu_client(
rpc_client.clone(),
&ws_addr,
fanout_slots,
connection_cache.clone(),
)
.await?;
let tpu_client = Arc::new(RwLock::new(tpu_client));
Ok(Self {
@ -35,6 +62,7 @@ impl TpuManager {
ws_addr,
fanout_slots,
error_count: Default::default(),
connection_cache: connection_cache,
})
}
@ -42,11 +70,13 @@ impl TpuManager {
rpc_client: Arc<RpcClient>,
ws_addr: &str,
fanout_slots: u64,
connection_cache: Arc<QuicConnectionCache>,
) -> anyhow::Result<QuicTpuClient> {
Ok(TpuClient::new(
Ok(TpuClient::new_with_connection_cache(
rpc_client.clone(),
ws_addr,
TpuClientConfig { fanout_slots },
connection_cache,
)
.await?)
}
@ -55,9 +85,13 @@ impl TpuManager {
self.error_count.fetch_add(1, Ordering::Relaxed);
if self.error_count.load(Ordering::Relaxed) > 5 {
let tpu_client =
Self::new_tpu_client(self.rpc_client.clone(), &self.ws_addr, self.fanout_slots)
.await?;
let tpu_client = Self::new_tpu_client(
self.rpc_client.clone(),
&self.ws_addr,
self.fanout_slots,
self.connection_cache.clone(),
)
.await?;
self.error_count.store(0, Ordering::Relaxed);
*self.tpu_client.write().await = tpu_client;
info!("TPU Reset after 5 errors");
@ -85,7 +119,7 @@ impl TpuManager {
}
}
pub async fn get_tpu_client(&self) -> RwLockReadGuard<QuicTpuClient> {
self.tpu_client.read().await
pub async fn estimated_current_slot(&self) -> u64 {
self.tpu_client.read().await.estimated_current_slot()
}
}

View File

@ -95,7 +95,7 @@ impl TxSender {
};
if let Some(postgres) = postgres {
let forwarded_slot = tpu_client.get_tpu_client().await.estimated_current_slot();
let forwarded_slot: u64 = tpu_client.estimated_current_slot().await;
for (sig, recent_slot) in sigs_and_slots {
postgres

View File

@ -12,7 +12,7 @@ use lite_rpc::{
};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_transaction_status::TransactionConfirmationStatus;
use tokio::sync::mpsc;
@ -27,6 +27,7 @@ async fn send_and_confirm_txs() {
rpc_client.clone(),
DEFAULT_WS_ADDR.into(),
Default::default(),
Keypair::new(),
)
.await
.unwrap(),