Adding configuration to make number of accounts fetched by gma (#8)
This commit is contained in:
parent
1bd6d1aa1a
commit
20fd5e70c2
|
@ -29,6 +29,7 @@ pub async fn main() {
|
||||||
grpc_sources: Some(vec![]),
|
grpc_sources: Some(vec![]),
|
||||||
dedup_queue_size: 0,
|
dedup_queue_size: 0,
|
||||||
request_timeout_in_seconds: None,
|
request_timeout_in_seconds: None,
|
||||||
|
number_of_accounts_per_gma: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Raydium
|
// Raydium
|
||||||
|
|
|
@ -125,6 +125,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.unwrap_or_else(|| panic!("did not find a source config for region {}", region));
|
.unwrap_or_else(|| panic!("did not find a source config for region {}", region));
|
||||||
|
|
||||||
let rpc = build_rpc(&source_config);
|
let rpc = build_rpc(&source_config);
|
||||||
|
let number_of_accounts_per_gma = source_config.number_of_accounts_per_gma.unwrap_or(100);
|
||||||
|
|
||||||
// handle sigint
|
// handle sigint
|
||||||
let exit_flag: Arc<atomic::AtomicBool> = Arc::new(atomic::AtomicBool::new(false));
|
let exit_flag: Arc<atomic::AtomicBool> = Arc::new(atomic::AtomicBool::new(false));
|
||||||
|
@ -313,7 +314,12 @@ async fn main() -> anyhow::Result<()> {
|
||||||
info!("Using {} mints", mints.len(),);
|
info!("Using {} mints", mints.len(),);
|
||||||
|
|
||||||
let token_cache = {
|
let token_cache = {
|
||||||
let mint_metadata = request_mint_metadata(&source_config.rpc_http_url, &mints).await;
|
let mint_metadata = request_mint_metadata(
|
||||||
|
&source_config.rpc_http_url,
|
||||||
|
&mints,
|
||||||
|
number_of_accounts_per_gma,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
let mut data: HashMap<Pubkey, token_cache::Decimals> = HashMap::new();
|
let mut data: HashMap<Pubkey, token_cache::Decimals> = HashMap::new();
|
||||||
for (mint_pubkey, Token { mint, decimals }) in mint_metadata {
|
for (mint_pubkey, Token { mint, decimals }) in mint_metadata {
|
||||||
assert_eq!(mint_pubkey, mint);
|
assert_eq!(mint_pubkey, mint);
|
||||||
|
|
|
@ -43,7 +43,6 @@ use yellowstone_grpc_proto::geyser::{
|
||||||
use yellowstone_grpc_proto::tonic::codec::CompressionEncoding;
|
use yellowstone_grpc_proto::tonic::codec::CompressionEncoding;
|
||||||
|
|
||||||
const MAX_GRPC_ACCOUNT_SUBSCRIPTIONS: usize = 100;
|
const MAX_GRPC_ACCOUNT_SUBSCRIPTIONS: usize = 100;
|
||||||
const MAX_GMA_ACCOUNTS: usize = 100;
|
|
||||||
|
|
||||||
// limit number of concurrent gMA/gPA requests
|
// limit number of concurrent gMA/gPA requests
|
||||||
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
|
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
|
||||||
|
@ -72,6 +71,7 @@ pub async fn feed_data_geyser(
|
||||||
sender: async_channel::Sender<SourceMessage>,
|
sender: async_channel::Sender<SourceMessage>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
|
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
|
||||||
|
let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100);
|
||||||
let grpc_connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
|
let grpc_connection_string = match &grpc_config.connection_string.chars().next().unwrap() {
|
||||||
'$' => env::var(&grpc_config.connection_string[1..])
|
'$' => env::var(&grpc_config.connection_string[1..])
|
||||||
.expect("reading connection string from env"),
|
.expect("reading connection string from env"),
|
||||||
|
@ -336,7 +336,7 @@ pub async fn feed_data_geyser(
|
||||||
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
|
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
|
||||||
|
|
||||||
info!("Requesting snapshot from gMA for {} filter accounts", accounts_filter.len());
|
info!("Requesting snapshot from gMA for {} filter accounts", accounts_filter.len());
|
||||||
for pubkey_chunk in accounts_filter.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
|
for pubkey_chunk in accounts_filter.iter().chunks(number_of_accounts_per_gma).into_iter() {
|
||||||
let rpc_http_url = snapshot_rpc_http_url.clone();
|
let rpc_http_url = snapshot_rpc_http_url.clone();
|
||||||
let account_ids = pubkey_chunk.cloned().collect_vec();
|
let account_ids = pubkey_chunk.cloned().collect_vec();
|
||||||
let sender = snapshot_gma_sender.clone();
|
let sender = snapshot_gma_sender.clone();
|
||||||
|
|
|
@ -16,7 +16,6 @@ use std::time::Instant;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
use tracing::{info, trace};
|
use tracing::{info, trace};
|
||||||
|
|
||||||
const MAX_GMA_ACCOUNTS: usize = 100;
|
|
||||||
// 4: 388028 mints -> 61 sec
|
// 4: 388028 mints -> 61 sec
|
||||||
// 16: 388028 mints -> 35 sec
|
// 16: 388028 mints -> 35 sec
|
||||||
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 16;
|
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 16;
|
||||||
|
@ -30,6 +29,7 @@ pub struct Token {
|
||||||
pub async fn request_mint_metadata(
|
pub async fn request_mint_metadata(
|
||||||
rpc_http_url: &str,
|
rpc_http_url: &str,
|
||||||
mint_account_ids: &HashSet<Pubkey>,
|
mint_account_ids: &HashSet<Pubkey>,
|
||||||
|
max_gma_accounts: usize,
|
||||||
) -> HashMap<Pubkey, Token> {
|
) -> HashMap<Pubkey, Token> {
|
||||||
info!(
|
info!(
|
||||||
"Requesting data for mint accounts via chunked gMA for {} pubkey ..",
|
"Requesting data for mint accounts via chunked gMA for {} pubkey ..",
|
||||||
|
@ -51,7 +51,7 @@ pub async fn request_mint_metadata(
|
||||||
|
|
||||||
let mut threads = Vec::new();
|
let mut threads = Vec::new();
|
||||||
let count = Arc::new(AtomicU64::new(0));
|
let count = Arc::new(AtomicU64::new(0));
|
||||||
for pubkey_chunk in mint_account_ids.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
|
for pubkey_chunk in mint_account_ids.iter().chunks(max_gma_accounts).into_iter() {
|
||||||
let pubkey_chunk = pubkey_chunk.into_iter().cloned().collect_vec();
|
let pubkey_chunk = pubkey_chunk.into_iter().cloned().collect_vec();
|
||||||
let count = count.clone();
|
let count = count.clone();
|
||||||
let rpc_client = rpc_client.clone();
|
let rpc_client = rpc_client.clone();
|
||||||
|
|
|
@ -26,8 +26,6 @@ use router_feed_lib::get_program_account::{
|
||||||
use solana_program::clock::Slot;
|
use solana_program::clock::Slot;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
const MAX_GMA_ACCOUNTS: usize = 100;
|
|
||||||
|
|
||||||
// limit number of concurrent gMA/gPA requests
|
// limit number of concurrent gMA/gPA requests
|
||||||
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
|
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
|
||||||
|
|
||||||
|
@ -46,6 +44,7 @@ pub async fn feed_data_geyser(
|
||||||
sender: async_channel::Sender<SourceMessage>,
|
sender: async_channel::Sender<SourceMessage>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
|
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
|
||||||
|
let number_of_accounts_per_gma = snapshot_config.number_of_accounts_per_gma.unwrap_or(100);
|
||||||
|
|
||||||
let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
|
let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
|
||||||
'$' => env::var(&snapshot_config.rpc_http_url[1..])
|
'$' => env::var(&snapshot_config.rpc_http_url[1..])
|
||||||
|
@ -194,7 +193,7 @@ pub async fn feed_data_geyser(
|
||||||
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
|
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
|
||||||
|
|
||||||
info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len());
|
info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len());
|
||||||
for pubkey_chunk in subscribed_accounts.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
|
for pubkey_chunk in subscribed_accounts.iter().chunks(number_of_accounts_per_gma).into_iter() {
|
||||||
let rpc_http_url = snapshot_rpc_http_url.clone();
|
let rpc_http_url = snapshot_rpc_http_url.clone();
|
||||||
let account_ids = pubkey_chunk.map(|x| *x).collect_vec();
|
let account_ids = pubkey_chunk.map(|x| *x).collect_vec();
|
||||||
let sender = snapshot_gma_sender.clone();
|
let sender = snapshot_gma_sender.clone();
|
||||||
|
|
|
@ -96,6 +96,7 @@ pub struct AccountDataSourceConfig {
|
||||||
pub grpc_sources: Option<Vec<GrpcSourceConfig>>,
|
pub grpc_sources: Option<Vec<GrpcSourceConfig>>,
|
||||||
pub dedup_queue_size: usize,
|
pub dedup_queue_size: usize,
|
||||||
pub request_timeout_in_seconds: Option<u64>,
|
pub request_timeout_in_seconds: Option<u64>,
|
||||||
|
pub number_of_accounts_per_gma: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde_derive::Deserialize)]
|
#[derive(Clone, Debug, serde_derive::Deserialize)]
|
||||||
|
|
Loading…
Reference in New Issue