Fixing accounts on demand (#371)

* Fixing accounts on demand

* Adding missing file and renaming gprc to grpc

* Adding missing grpc_accounts_streaming
This commit is contained in:
galactus 2024-03-26 14:32:50 +01:00 committed by GitHub
parent 3c994597b4
commit 29bd6deae4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 244 additions and 377 deletions

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use dashmap::DashSet;
use futures::lock::Mutex;
use itertools::Itertools;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_client::{
@ -9,7 +10,9 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::RpcFilterType,
};
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
use solana_lite_rpc_accounts::account_store_interface::{
AccountLoadingError, AccountStorageInterface,
};
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
@ -19,7 +22,7 @@ use solana_lite_rpc_core::{
},
};
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use tokio::sync::{broadcast::Sender, RwLock};
use tokio::sync::{broadcast::Sender, Notify, RwLock};
use crate::subscription_manager::SubscriptionManger;
@ -31,12 +34,15 @@ lazy_static::lazy_static! {
register_int_gauge!(opts!("literpc_number_of_program_filters_on_demand", "Number of program filters on demand")).unwrap();
}
const RETRY_FETCHING_ACCOUNT: usize = 10;
pub struct AccountsOnDemand {
rpc_client: Arc<RpcClient>,
accounts_storage: Arc<dyn AccountStorageInterface>,
accounts_subscribed: Arc<DashSet<Pubkey>>,
program_filters: Arc<RwLock<AccountFilters>>,
subscription_manager: SubscriptionManger,
accounts_is_loading: Arc<Mutex<HashMap<Pubkey, Arc<Notify>>>>,
}
impl AccountsOnDemand {
@ -56,6 +62,7 @@ impl AccountsOnDemand {
accounts_storage,
account_notification_sender,
),
accounts_is_loading: Arc::new(Mutex::new(HashMap::new())),
}
}
@ -102,52 +109,99 @@ impl AccountStorageInterface for AccountsOnDemand {
.await
}
async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option<AccountData> {
async fn get_account(
&self,
account_pk: Pubkey,
commitment: Commitment,
) -> Result<Option<AccountData>, AccountLoadingError> {
match self
.accounts_storage
.get_account(account_pk, commitment)
.await
.await?
{
Some(account_data) => Some(account_data),
Some(account_data) => Ok(Some(account_data)),
None => {
// account does not exist in account store
// first check if we have already subscribed to the required account
// This is to avoid resetting geyser subscription because of accounts that do not exists.
if !self.accounts_subscribed.contains(&account_pk) {
// get account from rpc and create its subscription
self.accounts_subscribed.insert(account_pk);
self.refresh_subscription().await;
let account_response = self
.rpc_client
.get_account_with_commitment(
&account_pk,
commitment.into_commiment_config(),
let mut lk = self.accounts_is_loading.lock().await;
match lk.get(&account_pk).cloned() {
Some(loading_account) => {
drop(lk);
match tokio::time::timeout(
Duration::from_secs(10),
loading_account.notified(),
)
.await;
if let Ok(response) = account_response {
match response.value {
Some(account) => {
// update account in storage and return the account data
let account_data = AccountData {
pubkey: account_pk,
account: Arc::new(account),
updated_slot: response.context.slot,
};
.await
{
Ok(_) => {
self.accounts_storage
.update_account(account_data.clone(), commitment)
.await;
Some(account_data)
.get_account(account_pk, commitment)
.await
}
// account does not exist
None => None,
Err(_timeout) => Err(AccountLoadingError::OperationTimeOut),
}
}
None => {
// account is not loading
if self.accounts_subscribed.contains(&account_pk) {
// account was already tried to be loaded but does not exists
Ok(None)
} else {
// update account loading map
// create a notify for accounts under loading
lk.insert(account_pk, Arc::new(Notify::new()));
self.accounts_subscribed.insert(account_pk);
drop(lk);
self.refresh_subscription().await;
let mut return_value = None;
for _ in 0..RETRY_FETCHING_ACCOUNT {
let account_response = self
.rpc_client
.get_account_with_commitment(
&account_pk,
commitment.into_commiment_config(),
)
.await;
match account_response {
Ok(response) => {
if let Some(account) = response.value {
// update account in storage and return the account data
let account_data = AccountData {
pubkey: account_pk,
account: Arc::new(account),
updated_slot: response.context.slot,
};
self.accounts_storage
.update_account(account_data.clone(), commitment)
.await;
return_value = Some(account_data);
break;
} else {
// account does not exist
break;
}
}
Err(e) => {
log::error!(
"Error fetching account {} {e:?}",
account_pk.to_string()
);
}
}
}
// update loading lock
{
let mut write_lock = self.accounts_is_loading.lock().await;
let notify = write_lock.remove(&account_pk);
drop(write_lock);
if let Some(notify) = notify {
notify.notify_waiters();
}
}
Ok(return_value)
}
} else {
// issue getting account, will then be updated by geyser
None
}
} else {
// we have already subscribed to the account and it does not exist
None
}
}
}

View File

@ -1,33 +1,22 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use itertools::Itertools;
use merge_streams::MergeStreams;
use prometheus::{opts, register_int_gauge, IntGauge};
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
use solana_lite_rpc_cluster_endpoints::{
geyser_grpc_connector::GrpcSourceConfig,
grpc::grpc_accounts_streaming::start_account_streaming_tasks,
};
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage, AccountStream},
account_filter::{AccountFilterType, AccountFilters, MemcmpFilterData},
account_data::{AccountNotificationMessage, AccountStream},
account_filter::AccountFilters,
},
AnyhowJoinHandle,
};
use solana_sdk::{account::Account, pubkey::Pubkey};
use tokio::sync::{
broadcast::{self, Sender},
watch, Notify,
};
use yellowstone_grpc_proto::geyser::{
subscribe_request_filter_accounts_filter::Filter,
subscribe_request_filter_accounts_filter_memcmp::Data, subscribe_update::UpdateOneof,
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter,
SubscribeRequestFilterAccountsFilterMemcmp,
watch,
};
lazy_static::lazy_static! {
@ -89,212 +78,6 @@ impl SubscriptionManger {
}
}
pub fn start_account_streaming_task(
grpc_config: GrpcSourceConfig,
accounts_filters: AccountFilters,
account_stream_sx: broadcast::Sender<AccountNotificationMessage>,
has_started: Arc<Notify>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
'main_loop: loop {
let processed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Processed;
let mut subscribe_programs: HashMap<String, SubscribeRequestFilterAccounts> =
HashMap::new();
let mut accounts_to_subscribe = HashSet::new();
for (index, accounts_filter) in accounts_filters.iter().enumerate() {
if !accounts_filter.accounts.is_empty() {
accounts_filter.accounts.iter().for_each(|account| {
accounts_to_subscribe.insert(account.clone());
});
}
if let Some(program_id) = &accounts_filter.program_id {
let filters = if let Some(filters) = &accounts_filter.filters {
filters
.iter()
.map(|filter| match filter {
AccountFilterType::Datasize(size) => {
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Datasize(*size)),
}
}
AccountFilterType::Memcmp(memcmp) => {
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::Memcmp(
SubscribeRequestFilterAccountsFilterMemcmp {
offset: memcmp.offset,
data: Some(match &memcmp.data {
MemcmpFilterData::Bytes(bytes) => {
Data::Bytes(bytes.clone())
}
MemcmpFilterData::Base58(data) => {
Data::Base58(data.clone())
}
MemcmpFilterData::Base64(data) => {
Data::Base64(data.clone())
}
}),
},
)),
}
}
AccountFilterType::TokenAccountState => {
SubscribeRequestFilterAccountsFilter {
filter: Some(Filter::TokenAccountState(false)),
}
}
})
.collect_vec()
} else {
vec![]
};
subscribe_programs.insert(
format!("program_accounts_on_demand_{}", index),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![program_id.clone()],
filters,
},
);
}
}
let program_subscribe_request = SubscribeRequest {
accounts: subscribe_programs,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
};
log::info!(
"Accounts on demand subscribing to {}",
grpc_config.grpc_addr
);
let Ok(mut client) = yellowstone_grpc_client::GeyserGrpcClient::connect(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
) else {
// problem connecting to grpc, retry after a sec
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
let Ok(account_stream) = client.subscribe_once2(program_subscribe_request).await else {
// problem subscribing to geyser stream, retry after a sec
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
// each account subscription batch will require individual stream
let mut subscriptions = vec![account_stream];
let mut index = 0;
for accounts_chunk in accounts_to_subscribe.iter().collect_vec().chunks(100) {
let mut accounts_subscription: HashMap<String, SubscribeRequestFilterAccounts> =
HashMap::new();
index += 1;
accounts_subscription.insert(
format!("account_sub_{}", index),
SubscribeRequestFilterAccounts {
account: accounts_chunk
.iter()
.map(|acc| (*acc).clone())
.collect_vec(),
owner: vec![],
filters: vec![],
},
);
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
)
.unwrap();
let account_request = SubscribeRequest {
accounts: accounts_subscription,
slots: Default::default(),
transactions: Default::default(),
blocks: Default::default(),
blocks_meta: Default::default(),
entry: Default::default(),
commitment: Some(processed_commitment.into()),
accounts_data_slice: Default::default(),
ping: None,
};
let account_stream = client.subscribe_once2(account_request).await.unwrap();
subscriptions.push(account_stream);
}
let mut merged_stream = subscriptions.merge();
while let Some(message) = merged_stream.next().await {
let message = match message {
Ok(message) => message,
Err(status) => {
log::error!("Account on demand grpc error : {}", status.message());
continue;
}
};
let Some(update) = message.update_oneof else {
continue;
};
has_started.notify_one();
match update {
UpdateOneof::Account(account) => {
if let Some(account_data) = account.account {
let account_pk_bytes: [u8; 32] = account_data
.pubkey
.try_into()
.expect("Pubkey should be 32 byte long");
let owner: [u8; 32] = account_data
.owner
.try_into()
.expect("owner pubkey should be deserializable");
let notification = AccountNotificationMessage {
data: AccountData {
pubkey: Pubkey::new_from_array(account_pk_bytes),
account: Arc::new(Account {
lamports: account_data.lamports,
data: account_data.data,
owner: Pubkey::new_from_array(owner),
executable: account_data.executable,
rent_epoch: account_data.rent_epoch,
}),
updated_slot: account.slot,
},
// TODO update with processed commitment / check above
commitment: Commitment::Processed,
};
if account_stream_sx.send(notification).is_err() {
// non recoverable, i.e the whole stream is being restarted
log::error!("Account stream broken, breaking from main loop");
break 'main_loop;
}
}
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping accounts stream");
}
_ => {
log::error!("GRPC accounts steam misconfigured");
}
};
}
}
Ok(())
})
}
pub fn create_grpc_account_streaming_tasks(
grpc_sources: Vec<GrpcSourceConfig>,
mut account_filter_watch: watch::Receiver<AccountFilters>,
@ -318,7 +101,7 @@ pub fn create_grpc_account_streaming_tasks(
let mut current_tasks = grpc_sources
.iter()
.map(|grpc_config| {
start_account_streaming_task(
start_account_streaming_tasks(
grpc_config.clone(),
accounts_filters.clone(),
account_sender.clone(),
@ -338,7 +121,7 @@ pub fn create_grpc_account_streaming_tasks(
let new_tasks = grpc_sources
.iter()
.map(|grpc_config| {
start_account_streaming_task(
start_account_streaming_tasks(
grpc_config.clone(),
accounts_filters.clone(),
account_sender.clone(),

View File

@ -21,7 +21,7 @@ use solana_rpc_client_api::{
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, slot_history::Slot};
use tokio::sync::broadcast::Sender;
use crate::account_store_interface::AccountStorageInterface;
use crate::account_store_interface::{AccountLoadingError, AccountStorageInterface};
lazy_static::lazy_static! {
static ref ACCOUNT_UPDATES: IntGauge =
@ -250,7 +250,7 @@ impl AccountService {
&self,
account: Pubkey,
config: Option<RpcAccountInfoConfig>,
) -> anyhow::Result<(Slot, Option<UiAccount>)> {
) -> Result<(Slot, Option<UiAccount>), AccountLoadingError> {
GET_ACCOUNT_CALLED.inc();
let commitment = config
.as_ref()
@ -259,7 +259,7 @@ impl AccountService {
let commitment = Commitment::from(commitment);
if let Some(account_data) = self.account_store.get_account(account, commitment).await {
if let Some(account_data) = self.account_store.get_account(account, commitment).await? {
// if minimum context slot is not satisfied return Null
let minimum_context_slot = config
.as_ref()
@ -273,10 +273,7 @@ impl AccountService {
Ok((account_data.updated_slot, None))
}
} else {
bail!(
"Account {} does not satisfy any configured filters",
account.to_string()
)
Err(AccountLoadingError::ConfigDoesnotContainRequiredFilters)
}
}

View File

@ -5,6 +5,13 @@ use solana_rpc_client_api::filter::RpcFilterType;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::slot_history::Slot;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum AccountLoadingError {
AccountNotFound,
ConfigDoesnotContainRequiredFilters,
OperationTimeOut,
}
#[async_trait]
pub trait AccountStorageInterface: Send + Sync {
// Update account and return true if the account was sucessfylly updated
@ -12,7 +19,11 @@ pub trait AccountStorageInterface: Send + Sync {
async fn initilize_or_update_account(&self, account_data: AccountData);
async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option<AccountData>;
async fn get_account(
&self,
account_pk: Pubkey,
commitment: Commitment,
) -> Result<Option<AccountData>, AccountLoadingError>;
async fn get_program_accounts(
&self,

View File

@ -1,6 +1,6 @@
use std::{collections::HashSet, sync::Arc};
use crate::account_store_interface::AccountStorageInterface;
use crate::account_store_interface::{AccountLoadingError, AccountStorageInterface};
use async_trait::async_trait;
use dashmap::{DashMap, DashSet};
use itertools::Itertools;
@ -313,11 +313,15 @@ impl AccountStorageInterface for InmemoryAccountStore {
}
}
async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option<AccountData> {
async fn get_account(
&self,
account_pk: Pubkey,
commitment: Commitment,
) -> Result<Option<AccountData>, AccountLoadingError> {
if let Some(account_by_commitment) = self.account_store.get(&account_pk) {
account_by_commitment.get_account_data(commitment).clone()
Ok(account_by_commitment.get_account_data(commitment).clone())
} else {
None
Ok(None)
}
}
@ -331,7 +335,7 @@ impl AccountStorageInterface for InmemoryAccountStore {
let mut return_vec = vec![];
for program_account in program_accounts.iter() {
let account_data = self.get_account(*program_account, commitment).await;
if let Some(account_data) = account_data {
if let Ok(Some(account_data)) = account_data {
// recheck program owner and filters
if account_data.account.owner.eq(&program_pubkey) {
match &account_filters {
@ -483,28 +487,28 @@ mod tests {
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Confirmed).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
assert_eq!(
store.get_account(pk2, Commitment::Processed).await,
Some(account_data_1.clone())
Ok(Some(account_data_1.clone()))
);
assert_eq!(
store.get_account(pk2, Commitment::Confirmed).await,
Some(account_data_1.clone())
Ok(Some(account_data_1.clone()))
);
assert_eq!(
store.get_account(pk2, Commitment::Finalized).await,
Some(account_data_1.clone())
Ok(Some(account_data_1.clone()))
);
let account_data_2 = create_random_account(&mut rng, 1, pk1, program);
@ -527,60 +531,60 @@ mod tests {
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
Some(account_data_5.clone())
Ok(Some(account_data_5.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Confirmed).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
store.process_slot_data(1, Commitment::Confirmed).await;
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
Some(account_data_5.clone())
Ok(Some(account_data_5.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Confirmed).await,
Some(account_data_2.clone())
Ok(Some(account_data_2.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
store.process_slot_data(2, Commitment::Confirmed).await;
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
Some(account_data_5.clone())
Ok(Some(account_data_5.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Confirmed).await,
Some(account_data_3.clone())
Ok(Some(account_data_3.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(account_data_0.clone())
Ok(Some(account_data_0.clone()))
);
store.process_slot_data(1, Commitment::Finalized).await;
assert_eq!(
store.get_account(pk1, Commitment::Processed).await,
Some(account_data_5.clone())
Ok(Some(account_data_5.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Confirmed).await,
Some(account_data_3.clone())
Ok(Some(account_data_3.clone()))
);
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(account_data_2.clone())
Ok(Some(account_data_2.clone()))
);
}
@ -690,7 +694,7 @@ mod tests {
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(last_account.clone()),
Ok(Some(last_account.clone())),
);
// check finalizing previous commitment does not affect
@ -698,7 +702,7 @@ mod tests {
assert_eq!(
store.get_account(pk1, Commitment::Finalized).await,
Some(last_account),
Ok(Some(last_account)),
);
}

View File

@ -11,13 +11,13 @@ use itertools::Itertools;
use solana_lite_rpc_core::{
commitment_utils::Commitment,
structures::{
account_data::{AccountData, AccountNotificationMessage, AccountStream},
account_data::{AccountData, AccountNotificationMessage},
account_filter::{AccountFilterType, AccountFilters, MemcmpFilterData},
},
AnyhowJoinHandle,
};
use solana_sdk::{account::Account, pubkey::Pubkey};
use tokio::sync::broadcast;
use tokio::sync::Notify;
use yellowstone_grpc_proto::geyser::{
subscribe_request_filter_accounts_filter::Filter,
subscribe_request_filter_accounts_filter_memcmp::Data, subscribe_update::UpdateOneof,
@ -25,10 +25,13 @@ use yellowstone_grpc_proto::geyser::{
SubscribeRequestFilterAccountsFilterMemcmp,
};
use crate::grpc::grpc_utils::connect_with_timeout_hacked;
pub fn start_account_streaming_tasks(
grpc_config: GrpcSourceConfig,
accounts_filters: AccountFilters,
account_stream_sx: tokio::sync::mpsc::UnboundedSender<AccountNotificationMessage>,
account_stream_sx: tokio::sync::broadcast::Sender<AccountNotificationMessage>,
has_started: Arc<Notify>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
'main_loop: loop {
@ -108,12 +111,11 @@ pub fn start_account_streaming_tasks(
ping: None,
};
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
let mut client = connect_with_timeout_hacked(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
)
.unwrap();
.await?;
let account_stream = client.subscribe_once2(program_subscription).await.unwrap();
// each account subscription batch will require individual stream
@ -134,12 +136,11 @@ pub fn start_account_streaming_tasks(
filters: vec![],
},
);
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
let mut client = connect_with_timeout_hacked(
grpc_config.grpc_addr.clone(),
grpc_config.grpc_x_token.clone(),
None,
)
.unwrap();
.await?;
let account_request = SubscribeRequest {
accounts: accounts_subscription,
@ -159,11 +160,17 @@ pub fn start_account_streaming_tasks(
let mut merged_stream = subscriptions.merge();
while let Some(message) = merged_stream.next().await {
let message = message.unwrap();
let Ok(message) = message else {
// channel broken resubscribe
break;
};
let Some(update) = message.update_oneof else {
continue;
};
has_started.notify_one();
match update {
UpdateOneof::Account(account) => {
if let Some(account_data) = account.account {
@ -215,46 +222,50 @@ pub fn start_account_streaming_tasks(
pub fn create_grpc_account_streaming(
grpc_sources: Vec<GrpcSourceConfig>,
accounts_filters: AccountFilters,
) -> (AnyhowJoinHandle, AccountStream) {
let (account_sender, accounts_stream) = broadcast::channel::<AccountNotificationMessage>(1024);
account_stream_sx: tokio::sync::broadcast::Sender<AccountNotificationMessage>,
notify_abort: Arc<Notify>,
) -> AnyhowJoinHandle {
let jh: AnyhowJoinHandle = tokio::spawn(async move {
loop {
let (accounts_sx, mut accounts_rx) = tokio::sync::mpsc::unbounded_channel();
let jhs = grpc_sources
.iter()
.map(|grpc_config| {
start_account_streaming_tasks(
grpc_config.clone(),
accounts_filters.clone(),
accounts_sx.clone(),
account_stream_sx.clone(),
Arc::new(Notify::new()),
)
})
.collect_vec();
drop(accounts_sx);
let mut rx = account_stream_sx.subscribe();
loop {
match tokio::time::timeout(Duration::from_secs(60), accounts_rx.recv()).await {
Ok(Some(data)) => {
let _ = account_sender.send(data);
}
Ok(None) => {
log::error!("All grpc accounts channels close; restarting subscription");
break;
}
Err(_elapsed) => {
log::error!("No accounts data for a minute; restarting subscription");
tokio::select! {
data = tokio::time::timeout(Duration::from_secs(60), rx.recv()) => {
match data{
Ok(Ok(_)) => {
// do nothing / notification channel is working fine
}
Ok(Err(e)) => {
log::error!("Grpc stream failed by error : {e:?}");
break;
}
Err(_elapsed) => {
log::error!("No accounts data for a minute; restarting subscription");
break;
}
}
},
_ = notify_abort.notified() => {
log::debug!("Account stream aborted");
break;
}
}
}
for jh in jhs {
// abort previous handles
jh.abort();
}
jhs.iter().for_each(|x| x.abort());
}
});
(jh, accounts_stream)
jh
}

View File

@ -0,0 +1,38 @@
use bytes::Bytes;
use std::time::Duration;
use tonic::metadata::{errors::InvalidMetadataValue, AsciiMetadataValue};
use tonic::service::Interceptor;
use tonic::transport::ClientTlsConfig;
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken};
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::tonic;
pub async fn connect_with_timeout_hacked<E, T>(
endpoint: E,
x_token: Option<T>,
) -> anyhow::Result<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
let endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.buffer_size(Some(65536))
.initial_connection_window_size(4194304)
.initial_stream_window_size(4194304)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
// .http2_adaptive_window()
.tls_config(ClientTlsConfig::new())?;
let x_token: Option<AsciiMetadataValue> = x_token.map(|v| v.try_into()).transpose()?;
let interceptor = InterceptorXToken { x_token };
let channel = endpoint.connect_lazy();
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
);
Ok(client)
}

View File

@ -1 +1,2 @@
pub mod gprc_accounts_streaming;
pub mod grpc_accounts_streaming;
pub mod grpc_utils;

View File

@ -1,5 +1,6 @@
use crate::endpoint_stremers::EndpointStreaming;
use crate::grpc::gprc_accounts_streaming::create_grpc_account_streaming;
use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming;
use crate::grpc::grpc_utils::connect_with_timeout_hacked;
use crate::grpc_multiplex::{
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription,
};
@ -9,6 +10,7 @@ use geyser_grpc_connector::GrpcSourceConfig;
use itertools::Itertools;
use log::trace;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::structures::account_data::AccountNotificationMessage;
use solana_lite_rpc_core::structures::account_filter::AccountFilters;
use solana_lite_rpc_core::{
structures::produced_block::{ProducedBlock, TransactionInfo},
@ -34,7 +36,9 @@ use solana_transaction_status::{Reward, RewardType};
use std::cell::OnceCell;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::trace_span;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, SubscribeUpdateSlot,
@ -266,48 +270,7 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>,
(cu_requested, prioritization_fees)
}
// TODO: use function from geyser-grpc-connector
use bytes::Bytes;
use std::time::Duration;
use tonic::metadata::{errors::InvalidMetadataValue, AsciiMetadataValue};
use tonic::service::Interceptor;
use tonic::transport::ClientTlsConfig;
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken};
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::tonic;
// note: not called
async fn connect_with_timeout_hacked<E, T>(
endpoint: E,
x_token: Option<T>,
) -> anyhow::Result<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
let endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.buffer_size(Some(65536))
.initial_connection_window_size(4194304)
.initial_stream_window_size(4194304)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
// .http2_adaptive_window()
.tls_config(ClientTlsConfig::new())?;
let x_token: Option<AsciiMetadataValue> = x_token.map(|v| v.try_into()).transpose()?;
let interceptor = InterceptorXToken { x_token };
let channel = endpoint.connect_lazy();
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
);
Ok(client)
}
// note used
// not called
pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
@ -456,18 +419,23 @@ pub fn create_grpc_subscription(
let cluster_info_polling = poll_cluster_info(rpc_client.clone(), cluster_info_sx);
let vote_accounts_polling = poll_vote_accounts(rpc_client.clone(), va_sx);
// accounts
if !accounts_filter.is_empty() {
let (account_jh, processed_account_stream) =
create_grpc_account_streaming(grpc_sources, accounts_filter);
let (account_sender, accounts_stream) =
tokio::sync::broadcast::channel::<AccountNotificationMessage>(1024);
let account_jh = create_grpc_account_streaming(
grpc_sources,
accounts_filter,
account_sender,
Arc::new(Notify::new()),
);
let streamers = EndpointStreaming {
blocks_notifier: block_multiplex_channel,
blockinfo_notifier: blockmeta_channel,
slot_notifier: slot_multiplex_channel,
cluster_info_notifier,
vote_account_notifier,
processed_account_stream: Some(processed_account_stream),
processed_account_stream: Some(accounts_stream),
};
let endpoint_tasks = vec![

View File

@ -440,7 +440,7 @@ fn setup_grpc_stream_debugging(blocks_notifier: &BlockStream) {
debugtask_blockstream_confirmation_sequence(blocks_notifier.resubscribe());
}
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
#[tokio::main()]
pub async fn main() -> anyhow::Result<()> {
setup_tracing_subscriber();