Adding configuration to make number of accounts fetched by gma
This commit is contained in:
parent
1bd6d1aa1a
commit
06004c11bb
|
@ -29,6 +29,7 @@ pub async fn main() {
|
|||
grpc_sources: Some(vec![]),
|
||||
dedup_queue_size: 0,
|
||||
request_timeout_in_seconds: None,
|
||||
number_of_accounts_per_gma: None,
|
||||
};
|
||||
|
||||
// Raydium
|
||||
|
|
|
@ -125,6 +125,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
.unwrap_or_else(|| panic!("did not find a source config for region {}", region));
|
||||
|
||||
let rpc = build_rpc(&source_config);
|
||||
let number_of_accounts_per_gma = source_config.number_of_accounts_per_gma.unwrap_or(100);
|
||||
|
||||
// handle sigint
|
||||
let exit_flag: Arc<atomic::AtomicBool> = Arc::new(atomic::AtomicBool::new(false));
|
||||
|
@ -313,7 +314,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
info!("Using {} mints", mints.len(),);
|
||||
|
||||
let token_cache = {
|
||||
let mint_metadata = request_mint_metadata(&source_config.rpc_http_url, &mints).await;
|
||||
let mint_metadata = request_mint_metadata(
|
||||
&source_config.rpc_http_url,
|
||||
&mints,
|
||||
number_of_accounts_per_gma,
|
||||
)
|
||||
.await;
|
||||
let mut data: HashMap<Pubkey, token_cache::Decimals> = HashMap::new();
|
||||
for (mint_pubkey, Token { mint, decimals }) in mint_metadata {
|
||||
assert_eq!(mint_pubkey, mint);
|
||||
|
|
|
@ -43,7 +43,6 @@ use yellowstone_grpc_proto::geyser::{
|
|||
use yellowstone_grpc_proto::tonic::codec::CompressionEncoding;
|
||||
|
||||
const MAX_GRPC_ACCOUNT_SUBSCRIPTIONS: usize = 100;
|
||||
const MAX_GMA_ACCOUNTS: usize = 100;
|
||||
|
||||
// limit number of concurrent gMA/gPA requests
|
||||
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
|
||||
|
@ -72,6 +71,7 @@ pub async fn feed_data_geyser(
|
|||
sender: async_channel::Sender<SourceMessage>,
|
||||
) -> anyhow::Result<()> {
|
||||
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
|
||||
let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100);
|
||||
let grpc_connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
|
||||
'$' => env::var(&grpc_config.connection_string[1..])
|
||||
.expect("reading connection string from env"),
|
||||
|
@ -336,7 +336,7 @@ pub async fn feed_data_geyser(
|
|||
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
|
||||
|
||||
info!("Requesting snapshot from gMA for {} filter accounts", accounts_filter.len());
|
||||
for pubkey_chunk in accounts_filter.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
|
||||
for pubkey_chunk in accounts_filter.iter().chunks(number_of_accounts_per_gma).into_iter() {
|
||||
let rpc_http_url = snapshot_rpc_http_url.clone();
|
||||
let account_ids = pubkey_chunk.cloned().collect_vec();
|
||||
let sender = snapshot_gma_sender.clone();
|
||||
|
|
|
@ -16,7 +16,6 @@ use std::time::Instant;
|
|||
use tokio::sync::Semaphore;
|
||||
use tracing::{info, trace};
|
||||
|
||||
const MAX_GMA_ACCOUNTS: usize = 100;
|
||||
// 4: 388028 mints -> 61 sec
|
||||
// 16: 388028 mints -> 35 sec
|
||||
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 16;
|
||||
|
@ -30,6 +29,7 @@ pub struct Token {
|
|||
pub async fn request_mint_metadata(
|
||||
rpc_http_url: &str,
|
||||
mint_account_ids: &HashSet<Pubkey>,
|
||||
max_gma_accounts: usize,
|
||||
) -> HashMap<Pubkey, Token> {
|
||||
info!(
|
||||
"Requesting data for mint accounts via chunked gMA for {} pubkey ..",
|
||||
|
@ -51,7 +51,7 @@ pub async fn request_mint_metadata(
|
|||
|
||||
let mut threads = Vec::new();
|
||||
let count = Arc::new(AtomicU64::new(0));
|
||||
for pubkey_chunk in mint_account_ids.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
|
||||
for pubkey_chunk in mint_account_ids.iter().chunks(max_gma_accounts).into_iter() {
|
||||
let pubkey_chunk = pubkey_chunk.into_iter().cloned().collect_vec();
|
||||
let count = count.clone();
|
||||
let rpc_client = rpc_client.clone();
|
||||
|
|
|
@ -26,8 +26,6 @@ use router_feed_lib::get_program_account::{
|
|||
use solana_program::clock::Slot;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
const MAX_GMA_ACCOUNTS: usize = 100;
|
||||
|
||||
// limit number of concurrent gMA/gPA requests
|
||||
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
|
||||
|
||||
|
@ -46,6 +44,7 @@ pub async fn feed_data_geyser(
|
|||
sender: async_channel::Sender<SourceMessage>,
|
||||
) -> anyhow::Result<()> {
|
||||
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
|
||||
let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100);
|
||||
|
||||
let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
|
||||
'$' => env::var(&snapshot_config.rpc_http_url[1..])
|
||||
|
@ -194,7 +193,7 @@ pub async fn feed_data_geyser(
|
|||
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
|
||||
|
||||
info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len());
|
||||
for pubkey_chunk in subscribed_accounts.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
|
||||
for pubkey_chunk in subscribed_accounts.iter().chunks(number_of_accounts_per_gma).into_iter() {
|
||||
let rpc_http_url = snapshot_rpc_http_url.clone();
|
||||
let account_ids = pubkey_chunk.map(|x| *x).collect_vec();
|
||||
let sender = snapshot_gma_sender.clone();
|
||||
|
|
|
@ -96,6 +96,7 @@ pub struct AccountDataSourceConfig {
|
|||
pub grpc_sources: Option<Vec<GrpcSourceConfig>>,
|
||||
pub dedup_queue_size: usize,
|
||||
pub request_timeout_in_seconds: Option<u64>,
|
||||
pub number_of_accounts_per_gma: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, serde_derive::Deserialize)]
|
||||
|
|
Loading…
Reference in New Issue