diff --git a/bin/autobahn-router/examples/grpc_source_tester.rs b/bin/autobahn-router/examples/grpc_source_tester.rs index fe9ae2b..c0fa690 100644 --- a/bin/autobahn-router/examples/grpc_source_tester.rs +++ b/bin/autobahn-router/examples/grpc_source_tester.rs @@ -29,6 +29,7 @@ pub async fn main() { grpc_sources: Some(vec![]), dedup_queue_size: 0, request_timeout_in_seconds: None, + number_of_accounts_per_gma: None, }; // Raydium diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index 118cb19..9fe7037 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -125,6 +125,7 @@ async fn main() -> anyhow::Result<()> { .unwrap_or_else(|| panic!("did not find a source config for region {}", region)); let rpc = build_rpc(&source_config); + let number_of_accounts_per_gma = source_config.number_of_accounts_per_gma.unwrap_or(100); // handle sigint let exit_flag: Arc = Arc::new(atomic::AtomicBool::new(false)); @@ -313,7 +314,12 @@ async fn main() -> anyhow::Result<()> { info!("Using {} mints", mints.len(),); let token_cache = { - let mint_metadata = request_mint_metadata(&source_config.rpc_http_url, &mints).await; + let mint_metadata = request_mint_metadata( + &source_config.rpc_http_url, + &mints, + number_of_accounts_per_gma, + ) + .await; let mut data: HashMap = HashMap::new(); for (mint_pubkey, Token { mint, decimals }) in mint_metadata { assert_eq!(mint_pubkey, mint); diff --git a/bin/autobahn-router/src/source/grpc_plugin_source.rs b/bin/autobahn-router/src/source/grpc_plugin_source.rs index 6684b96..11bbe89 100644 --- a/bin/autobahn-router/src/source/grpc_plugin_source.rs +++ b/bin/autobahn-router/src/source/grpc_plugin_source.rs @@ -43,7 +43,6 @@ use yellowstone_grpc_proto::geyser::{ use yellowstone_grpc_proto::tonic::codec::CompressionEncoding; const MAX_GRPC_ACCOUNT_SUBSCRIPTIONS: usize = 100; -const MAX_GMA_ACCOUNTS: usize = 100; // limit number of concurrent gMA/gPA requests const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4; @@ -72,6 +71,7 @@ pub async fn feed_data_geyser( sender: async_channel::Sender, ) -> anyhow::Result<()> { let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false); + let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100); let grpc_connection_string = match &grpc_config.connection_string.chars().next().unwrap() { '$' => env::var(&grpc_config.connection_string[1..]) .expect("reading connection string from env"), @@ -336,7 +336,7 @@ pub async fn feed_data_geyser( let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS)); info!("Requesting snapshot from gMA for {} filter accounts", accounts_filter.len()); - for pubkey_chunk in accounts_filter.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() { + for pubkey_chunk in accounts_filter.iter().chunks(number_of_accounts_per_gma).into_iter() { let rpc_http_url = snapshot_rpc_http_url.clone(); let account_ids = pubkey_chunk.cloned().collect_vec(); let sender = snapshot_gma_sender.clone(); diff --git a/bin/autobahn-router/src/source/mint_accounts_source.rs b/bin/autobahn-router/src/source/mint_accounts_source.rs index ec3a942..84f9155 100644 --- a/bin/autobahn-router/src/source/mint_accounts_source.rs +++ b/bin/autobahn-router/src/source/mint_accounts_source.rs @@ -16,7 +16,6 @@ use std::time::Instant; use tokio::sync::Semaphore; use tracing::{info, trace}; -const MAX_GMA_ACCOUNTS: usize = 100; // 4: 388028 mints -> 61 sec // 16: 388028 mints -> 35 sec const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 16; @@ -30,6 +29,7 @@ pub struct Token { pub async fn request_mint_metadata( rpc_http_url: &str, mint_account_ids: &HashSet, + max_gma_accounts: usize, ) -> HashMap { info!( "Requesting data for mint accounts via chunked gMA for {} pubkey ..", @@ -51,7 +51,7 @@ pub async fn request_mint_metadata( let mut threads = Vec::new(); let count = Arc::new(AtomicU64::new(0)); - for pubkey_chunk in mint_account_ids.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() { + for pubkey_chunk in mint_account_ids.iter().chunks(max_gma_accounts).into_iter() { let pubkey_chunk = pubkey_chunk.into_iter().cloned().collect_vec(); let count = count.clone(); let rpc_client = rpc_client.clone(); diff --git a/bin/autobahn-router/src/source/quic_plugin_source.rs b/bin/autobahn-router/src/source/quic_plugin_source.rs index f2df0ef..fe9482c 100644 --- a/bin/autobahn-router/src/source/quic_plugin_source.rs +++ b/bin/autobahn-router/src/source/quic_plugin_source.rs @@ -26,8 +26,6 @@ use router_feed_lib::get_program_account::{ use solana_program::clock::Slot; use tokio::sync::Semaphore; -const MAX_GMA_ACCOUNTS: usize = 100; - // limit number of concurrent gMA/gPA requests const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4; @@ -46,6 +44,7 @@ pub async fn feed_data_geyser( sender: async_channel::Sender, ) -> anyhow::Result<()> { let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false); + let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100); let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() { '$' => env::var(&snapshot_config.rpc_http_url[1..]) @@ -194,7 +193,7 @@ pub async fn feed_data_geyser( let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS)); info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len()); - for pubkey_chunk in subscribed_accounts.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() { + for pubkey_chunk in subscribed_accounts.iter().chunks(number_of_accounts_per_gma).into_iter() { let rpc_http_url = snapshot_rpc_http_url.clone(); let account_ids = pubkey_chunk.map(|x| *x).collect_vec(); let sender = snapshot_gma_sender.clone(); diff --git a/lib/router-config-lib/src/lib.rs b/lib/router-config-lib/src/lib.rs index cabe96c..5fde856 100644 --- a/lib/router-config-lib/src/lib.rs +++ b/lib/router-config-lib/src/lib.rs @@ -96,6 +96,7 @@ pub struct AccountDataSourceConfig { pub grpc_sources: Option>, pub dedup_queue_size: usize, pub request_timeout_in_seconds: Option, + pub number_of_accounts_per_gma: Option, } #[derive(Clone, Debug, serde_derive::Deserialize)]