Fanout size
This commit is contained in:
parent
0a3700b0bd
commit
a4392cee2d
|
@ -17,6 +17,7 @@ use solana_client::{
|
||||||
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient},
|
nonblocking::{pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient},
|
||||||
rpc_config::{RpcContextConfig, RpcRequestAirdropConfig},
|
rpc_config::{RpcContextConfig, RpcRequestAirdropConfig},
|
||||||
rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
|
rpc_response::{Response as RpcResponse, RpcBlockhash, RpcResponseContext, RpcVersionInfo},
|
||||||
|
tpu_client::TpuClientConfig,
|
||||||
};
|
};
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
commitment_config::{CommitmentConfig, CommitmentLevel},
|
commitment_config::{CommitmentConfig, CommitmentLevel},
|
||||||
|
@ -38,11 +39,21 @@ pub struct LiteBridge {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LiteBridge {
|
impl LiteBridge {
|
||||||
pub async fn new(rpc_url: reqwest::Url, ws_addr: &str) -> anyhow::Result<Self> {
|
pub async fn new(
|
||||||
|
rpc_url: reqwest::Url,
|
||||||
|
ws_addr: &str,
|
||||||
|
fanout_slots: u64,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
|
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
|
||||||
|
|
||||||
let tpu_client =
|
let tpu_client = Arc::new(
|
||||||
Arc::new(TpuClient::new(rpc_client.clone(), ws_addr, Default::default()).await?);
|
TpuClient::new(
|
||||||
|
rpc_client.clone(),
|
||||||
|
ws_addr,
|
||||||
|
TpuClientConfig { fanout_slots },
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
|
||||||
let pub_sub_client = Arc::new(PubsubClient::new(ws_addr).await?);
|
let pub_sub_client = Arc::new(PubsubClient::new(ws_addr).await?);
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS,
|
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS,
|
||||||
DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
|
DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
|
||||||
};
|
};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
@ -18,6 +18,9 @@ pub struct Args {
|
||||||
/// batch size of each batch forward
|
/// batch size of each batch forward
|
||||||
#[arg(short = 'b', long, default_value_t = DEFAULT_TX_BATCH_SIZE)]
|
#[arg(short = 'b', long, default_value_t = DEFAULT_TX_BATCH_SIZE)]
|
||||||
pub tx_batch_size: usize,
|
pub tx_batch_size: usize,
|
||||||
|
/// tpu fanout
|
||||||
|
#[arg(short = 'f', long, default_value_t = DEFAULT_FANOUT_SIZE) ]
|
||||||
|
pub fanout_size: u64,
|
||||||
/// interval between each batch forward
|
/// interval between each batch forward
|
||||||
#[arg(short = 'i', long, default_value_t = DEFAULT_TX_BATCH_INTERVAL_MS)]
|
#[arg(short = 'i', long, default_value_t = DEFAULT_TX_BATCH_INTERVAL_MS)]
|
||||||
pub tx_batch_interval_ms: u64,
|
pub tx_batch_interval_ms: u64,
|
||||||
|
|
|
@ -20,6 +20,8 @@ pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
|
||||||
#[from_env]
|
#[from_env]
|
||||||
pub const DEFAULT_TX_BATCH_SIZE: usize = 1 << 7;
|
pub const DEFAULT_TX_BATCH_SIZE: usize = 1 << 7;
|
||||||
#[from_env]
|
#[from_env]
|
||||||
|
pub const DEFAULT_FANOUT_SIZE: u64 = 32;
|
||||||
|
#[from_env]
|
||||||
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 1;
|
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64 = 1;
|
||||||
#[from_env]
|
#[from_env]
|
||||||
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
||||||
|
|
|
@ -24,12 +24,14 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
lite_rpc_http_addr,
|
lite_rpc_http_addr,
|
||||||
tx_batch_interval_ms,
|
tx_batch_interval_ms,
|
||||||
clean_interval_ms,
|
clean_interval_ms,
|
||||||
|
fanout_size,
|
||||||
} = Args::parse();
|
} = Args::parse();
|
||||||
|
|
||||||
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
|
let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
|
||||||
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
|
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
|
||||||
|
|
||||||
let light_bridge = LiteBridge::new(Url::from_str(&rpc_addr).unwrap(), &ws_addr).await?;
|
let light_bridge =
|
||||||
|
LiteBridge::new(Url::from_str(&rpc_addr).unwrap(), &ws_addr, fanout_size).await?;
|
||||||
|
|
||||||
let services = light_bridge
|
let services = light_bridge
|
||||||
.start_services(
|
.start_services(
|
||||||
|
|
Loading…
Reference in New Issue