Implementing account on demand (#338)
* Implementing account on demand * Resubscribing accounts after every sometime * Changing account stream commitment to processed instead * Fixing account on demand, and account subscription bugs * Changing variable name for commitment from confirmed to processed * Some account optimizations * Adding get balance call * Changes after groovies review. * On demand accounts subscribe first then delete old subscription
This commit is contained in:
parent
064d47719e
commit
49bb7ff9f3
|
@ -2539,6 +2539,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"solana-account-decoder",
|
||||
"solana-lite-rpc-accounts",
|
||||
"solana-lite-rpc-accounts-on-demand",
|
||||
"solana-lite-rpc-address-lookup-tables",
|
||||
"solana-lite-rpc-blockstore",
|
||||
"solana-lite-rpc-cluster-endpoints",
|
||||
|
@ -4381,6 +4382,49 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-lite-rpc-accounts-on-demand"
|
||||
version = "0.2.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"base64 0.21.7",
|
||||
"bincode",
|
||||
"bs58",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"dashmap 5.5.3",
|
||||
"futures",
|
||||
"itertools 0.10.5",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"prometheus",
|
||||
"quinn",
|
||||
"rand 0.8.5",
|
||||
"rand_chacha 0.3.1",
|
||||
"rustls",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"solana-account-decoder",
|
||||
"solana-address-lookup-table-program",
|
||||
"solana-client",
|
||||
"solana-lite-rpc-accounts",
|
||||
"solana-lite-rpc-cluster-endpoints",
|
||||
"solana-lite-rpc-core",
|
||||
"solana-net-utils",
|
||||
"solana-pubsub-client",
|
||||
"solana-rpc-client",
|
||||
"solana-rpc-client-api",
|
||||
"solana-sdk",
|
||||
"solana-streamer",
|
||||
"solana-transaction-status",
|
||||
"solana-version",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"yellowstone-grpc-client",
|
||||
"yellowstone-grpc-proto",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-lite-rpc-address-lookup-tables"
|
||||
version = "0.2.4"
|
||||
|
|
|
@ -12,10 +12,11 @@ members = [
|
|||
"blockstore",
|
||||
"prioritization_fees",
|
||||
"bench",
|
||||
"address_lookup_tables",
|
||||
"address-lookup-tables",
|
||||
"accounts",
|
||||
"accounts-on-demand",
|
||||
#examples
|
||||
"examples/custom-tpu-send-transactions",
|
||||
"examples/custom-tpu-send-transactions"
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
@ -79,8 +80,9 @@ solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.4"
|
|||
solana-lite-rpc-blockstore = {path = "blockstore", version="0.2.4"}
|
||||
solana-lite-rpc-stakevote = {path = "stake_vote", version="0.2.4"}
|
||||
solana-lite-rpc-prioritization-fees = {path = "prioritization_fees", version="0.2.4"}
|
||||
solana-lite-rpc-address-lookup-tables = {path = "address_lookup_tables", version="0.2.4"}
|
||||
solana-lite-rpc-address-lookup-tables = {path = "address-lookup-tables", version="0.2.4"}
|
||||
solana-lite-rpc-accounts = {path = "accounts", version = "0.2.4"}
|
||||
solana-lite-rpc-accounts-on-demand = {path = "accounts-on-demand", version = "0.2.4"}
|
||||
|
||||
async-trait = "0.1.68"
|
||||
yellowstone-grpc-client = { version = "1.13.0+solana.1.17.15", git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
[package]
|
||||
name = "solana-lite-rpc-accounts-on-demand"
|
||||
version = "0.2.4"
|
||||
edition = "2021"
|
||||
description = "Library which implements accounts on demand service in lite-rpc. Whenever account is not available it will fetch from rpc and create a geyser stream to update it automatically."
|
||||
rust-version = "1.73.0"
|
||||
repository = "https://github.com/blockworks-foundation/lite-rpc"
|
||||
license = "AGPL"
|
||||
|
||||
[dependencies]
|
||||
solana-sdk = { workspace = true }
|
||||
solana-rpc-client-api = { workspace = true }
|
||||
solana-transaction-status = { workspace = true }
|
||||
solana-version = { workspace = true }
|
||||
solana-client = { workspace = true }
|
||||
solana-net-utils = { workspace = true }
|
||||
solana-pubsub-client = { workspace = true }
|
||||
solana-rpc-client = { workspace = true }
|
||||
solana-streamer = { workspace = true }
|
||||
solana-account-decoder = { workspace = true }
|
||||
solana-address-lookup-table-program = { workspace = true }
|
||||
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = "1.*"
|
||||
bincode = { workspace = true }
|
||||
bs58 = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
anyhow = { workspace = true }
|
||||
log = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
quinn = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
rustls = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
prometheus = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
|
||||
solana-lite-rpc-core = { workspace = true }
|
||||
solana-lite-rpc-accounts = { workspace = true }
|
||||
solana-lite-rpc-cluster-endpoints = { workspace = true }
|
||||
|
||||
yellowstone-grpc-client = { workspace = true }
|
||||
yellowstone-grpc-proto = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.8.5"
|
||||
rand_chacha = "0.3.1"
|
|
@ -0,0 +1,231 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashSet;
|
||||
use itertools::Itertools;
|
||||
use solana_client::{
|
||||
nonblocking::rpc_client::RpcClient,
|
||||
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
|
||||
rpc_filter::RpcFilterType,
|
||||
};
|
||||
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
|
||||
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
|
||||
use solana_lite_rpc_core::{
|
||||
commitment_utils::Commitment,
|
||||
structures::{
|
||||
account_data::{AccountData, AccountNotificationMessage},
|
||||
account_filter::{AccountFilter, AccountFilterType, AccountFilters},
|
||||
},
|
||||
};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey};
|
||||
use tokio::sync::{broadcast::Sender, RwLock};
|
||||
|
||||
use crate::subscription_manager::SubscriptionManger;
|
||||
|
||||
pub struct AccountsOnDemand {
|
||||
rpc_client: Arc<RpcClient>,
|
||||
accounts_storage: Arc<dyn AccountStorageInterface>,
|
||||
accounts_subscribed: Arc<DashSet<Pubkey>>,
|
||||
program_filters: Arc<RwLock<AccountFilters>>,
|
||||
subscription_manager: SubscriptionManger,
|
||||
}
|
||||
|
||||
impl AccountsOnDemand {
|
||||
pub fn new(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
grpc_sources: Vec<GrpcSourceConfig>,
|
||||
accounts_storage: Arc<dyn AccountStorageInterface>,
|
||||
account_notification_sender: Sender<AccountNotificationMessage>,
|
||||
) -> Self {
|
||||
Self {
|
||||
rpc_client,
|
||||
accounts_storage: accounts_storage.clone(),
|
||||
accounts_subscribed: Arc::new(DashSet::new()),
|
||||
program_filters: Arc::new(RwLock::new(vec![])),
|
||||
subscription_manager: SubscriptionManger::new(
|
||||
grpc_sources,
|
||||
accounts_storage,
|
||||
account_notification_sender,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reset_subscription(&self) {
|
||||
let mut filters = self.get_filters().await;
|
||||
|
||||
// add additional filters related to accounts
|
||||
for accounts in &self
|
||||
.accounts_subscribed
|
||||
.iter()
|
||||
.map(|x| x.to_string())
|
||||
.chunks(100)
|
||||
{
|
||||
let account_filter = AccountFilter {
|
||||
accounts: accounts.collect(),
|
||||
program_id: None,
|
||||
filters: None,
|
||||
};
|
||||
filters.push(account_filter);
|
||||
}
|
||||
|
||||
self.subscription_manager
|
||||
.update_subscriptions(filters)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn get_filters(&self) -> AccountFilters {
|
||||
self.program_filters.read().await.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AccountStorageInterface for AccountsOnDemand {
|
||||
async fn update_account(&self, account_data: AccountData, commitment: Commitment) -> bool {
|
||||
self.accounts_storage
|
||||
.update_account(account_data, commitment)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn initilize_or_update_account(&self, account_data: AccountData) {
|
||||
self.accounts_storage
|
||||
.initilize_or_update_account(account_data)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option<AccountData> {
|
||||
match self
|
||||
.accounts_storage
|
||||
.get_account(account_pk, commitment)
|
||||
.await
|
||||
{
|
||||
Some(account_data) => Some(account_data),
|
||||
None => {
|
||||
// account does not exist in account store
|
||||
// first check if we have already subscribed to the required account
|
||||
// This is to avoid resetting geyser subscription because of accounts that do not exists.
|
||||
if !self.accounts_subscribed.contains(&account_pk) {
|
||||
// get account from rpc and create its subscription
|
||||
self.accounts_subscribed.insert(account_pk);
|
||||
self.reset_subscription().await;
|
||||
let account_response = self
|
||||
.rpc_client
|
||||
.get_account_with_commitment(
|
||||
&account_pk,
|
||||
commitment.into_commiment_config(),
|
||||
)
|
||||
.await;
|
||||
if let Ok(response) = account_response {
|
||||
match response.value {
|
||||
Some(account) => {
|
||||
// update account in storage and return the account data
|
||||
let account_data = AccountData {
|
||||
pubkey: account_pk,
|
||||
account: Arc::new(account),
|
||||
updated_slot: response.context.slot,
|
||||
};
|
||||
self.accounts_storage
|
||||
.update_account(account_data.clone(), commitment)
|
||||
.await;
|
||||
Some(account_data)
|
||||
}
|
||||
// account does not exist
|
||||
None => None,
|
||||
}
|
||||
} else {
|
||||
// issue getting account, will then be updated by geyser
|
||||
None
|
||||
}
|
||||
} else {
|
||||
// we have already subscribed to the account and it does not exist
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_program_accounts(
|
||||
&self,
|
||||
program_pubkey: Pubkey,
|
||||
filters: Option<Vec<RpcFilterType>>,
|
||||
commitment: Commitment,
|
||||
) -> Option<Vec<AccountData>> {
|
||||
match self
|
||||
.accounts_storage
|
||||
.get_program_accounts(program_pubkey, filters.clone(), commitment)
|
||||
.await
|
||||
{
|
||||
Some(accounts) => {
|
||||
// filter is already present
|
||||
Some(accounts)
|
||||
}
|
||||
None => {
|
||||
// check if filter is already present
|
||||
let current_filters = self.get_filters().await;
|
||||
let account_filter = AccountFilter {
|
||||
accounts: vec![],
|
||||
program_id: Some(program_pubkey.to_string()),
|
||||
filters: filters
|
||||
.clone()
|
||||
.map(|v| v.iter().map(AccountFilterType::from).collect()),
|
||||
};
|
||||
if current_filters.contains(&account_filter) {
|
||||
// filter already exisits / there is no account data
|
||||
return None;
|
||||
}
|
||||
|
||||
// add into filters
|
||||
{
|
||||
let mut writelk = self.program_filters.write().await;
|
||||
writelk.push(account_filter.clone());
|
||||
}
|
||||
self.reset_subscription().await;
|
||||
|
||||
let rpc_response = self
|
||||
.rpc_client
|
||||
.get_program_accounts_with_config(
|
||||
&program_pubkey,
|
||||
RpcProgramAccountsConfig {
|
||||
filters,
|
||||
account_config: RpcAccountInfoConfig {
|
||||
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
|
||||
data_slice: None,
|
||||
commitment: Some(commitment.into_commiment_config()),
|
||||
min_context_slot: None,
|
||||
},
|
||||
with_context: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
match rpc_response {
|
||||
Ok(program_accounts) => {
|
||||
let program_accounts = program_accounts
|
||||
.iter()
|
||||
.map(|(pk, account)| AccountData {
|
||||
pubkey: *pk,
|
||||
account: Arc::new(account.clone()),
|
||||
updated_slot: 0,
|
||||
})
|
||||
.collect_vec();
|
||||
// add fetched accounts into cache
|
||||
for account_data in &program_accounts {
|
||||
self.accounts_storage
|
||||
.update_account(account_data.clone(), commitment)
|
||||
.await;
|
||||
}
|
||||
Some(program_accounts)
|
||||
}
|
||||
Err(e) => {
|
||||
log::warn!("Got error while getting program accounts with {e:?}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_slot_data(&self, slot: Slot, commitment: Commitment) -> Vec<AccountData> {
|
||||
self.accounts_storage
|
||||
.process_slot_data(slot, commitment)
|
||||
.await
|
||||
}
|
||||
}
|
|
@ -0,0 +1,2 @@
|
|||
pub mod accounts_on_demand;
|
||||
pub mod subscription_manager;
|
|
@ -0,0 +1,317 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
|
||||
use solana_lite_rpc_cluster_endpoints::geyser_grpc_connector::GrpcSourceConfig;
|
||||
use solana_lite_rpc_core::{
|
||||
commitment_utils::Commitment,
|
||||
structures::{
|
||||
account_data::{AccountData, AccountNotificationMessage, AccountStream},
|
||||
account_filter::{AccountFilterType, AccountFilters, MemcmpFilterData},
|
||||
},
|
||||
AnyhowJoinHandle,
|
||||
};
|
||||
use solana_sdk::{account::Account, pubkey::Pubkey};
|
||||
use tokio::sync::{
|
||||
broadcast::{self, Sender},
|
||||
watch,
|
||||
};
|
||||
use yellowstone_grpc_proto::geyser::{
|
||||
subscribe_request_filter_accounts_filter::Filter,
|
||||
subscribe_request_filter_accounts_filter_memcmp::Data, subscribe_update::UpdateOneof,
|
||||
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter,
|
||||
SubscribeRequestFilterAccountsFilterMemcmp,
|
||||
};
|
||||
|
||||
pub struct SubscriptionManger {
|
||||
account_filter_watch: watch::Sender<AccountFilters>,
|
||||
}
|
||||
|
||||
impl SubscriptionManger {
|
||||
pub fn new(
|
||||
grpc_sources: Vec<GrpcSourceConfig>,
|
||||
accounts_storage: Arc<dyn AccountStorageInterface>,
|
||||
account_notification_sender: Sender<AccountNotificationMessage>,
|
||||
) -> Self {
|
||||
let (account_filter_watch, reciever) = watch::channel::<AccountFilters>(vec![]);
|
||||
|
||||
let (_, mut account_stream) = create_grpc_account_streaming_tasks(grpc_sources, reciever);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match account_stream.recv().await {
|
||||
Ok(message) => {
|
||||
let _ = account_notification_sender.send(message.clone());
|
||||
accounts_storage
|
||||
.update_account(message.data, message.commitment)
|
||||
.await;
|
||||
}
|
||||
Err(e) => match e {
|
||||
broadcast::error::RecvError::Closed => {
|
||||
panic!("Account stream channel is broken");
|
||||
}
|
||||
broadcast::error::RecvError::Lagged(lag) => {
|
||||
log::error!("Account on demand stream lagged by {lag:?}, missed some account updates; continue");
|
||||
continue;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
Self {
|
||||
account_filter_watch,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn update_subscriptions(&self, filters: AccountFilters) {
|
||||
if let Err(e) = self.account_filter_watch.send(filters) {
|
||||
log::error!(
|
||||
"Error updating accounts on demand subscription with {}",
|
||||
e.to_string()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_account_streaming_task(
|
||||
grpc_config: GrpcSourceConfig,
|
||||
accounts_filters: AccountFilters,
|
||||
account_stream_sx: broadcast::Sender<AccountNotificationMessage>,
|
||||
has_started: Arc<AtomicBool>,
|
||||
) -> AnyhowJoinHandle {
|
||||
tokio::spawn(async move {
|
||||
'main_loop: loop {
|
||||
let processed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Processed;
|
||||
|
||||
let mut subscribe_accounts: HashMap<String, SubscribeRequestFilterAccounts> =
|
||||
HashMap::new();
|
||||
|
||||
for (index, accounts_filter) in accounts_filters.iter().enumerate() {
|
||||
if !accounts_filter.accounts.is_empty() {
|
||||
subscribe_accounts.insert(
|
||||
format!("accounts_on_demand_{index:?}"),
|
||||
SubscribeRequestFilterAccounts {
|
||||
account: accounts_filter
|
||||
.accounts
|
||||
.iter()
|
||||
.map(|x| x.to_string())
|
||||
.collect_vec(),
|
||||
owner: vec![],
|
||||
filters: vec![],
|
||||
},
|
||||
);
|
||||
}
|
||||
if let Some(program_id) = &accounts_filter.program_id {
|
||||
let filters = if let Some(filters) = &accounts_filter.filters {
|
||||
filters
|
||||
.iter()
|
||||
.map(|filter| match filter {
|
||||
AccountFilterType::Datasize(size) => {
|
||||
SubscribeRequestFilterAccountsFilter {
|
||||
filter: Some(Filter::Datasize(*size)),
|
||||
}
|
||||
}
|
||||
AccountFilterType::Memcmp(memcmp) => {
|
||||
SubscribeRequestFilterAccountsFilter {
|
||||
filter: Some(Filter::Memcmp(
|
||||
SubscribeRequestFilterAccountsFilterMemcmp {
|
||||
offset: memcmp.offset,
|
||||
data: Some(match &memcmp.data {
|
||||
MemcmpFilterData::Bytes(bytes) => {
|
||||
Data::Bytes(bytes.clone())
|
||||
}
|
||||
MemcmpFilterData::Base58(data) => {
|
||||
Data::Base58(data.clone())
|
||||
}
|
||||
MemcmpFilterData::Base64(data) => {
|
||||
Data::Base64(data.clone())
|
||||
}
|
||||
}),
|
||||
},
|
||||
)),
|
||||
}
|
||||
}
|
||||
AccountFilterType::TokenAccountState => {
|
||||
SubscribeRequestFilterAccountsFilter {
|
||||
filter: Some(Filter::TokenAccountState(false)),
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect_vec()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
subscribe_accounts.insert(
|
||||
format!("program_accounts_on_demand_{}", program_id),
|
||||
SubscribeRequestFilterAccounts {
|
||||
account: vec![],
|
||||
owner: vec![program_id.clone()],
|
||||
filters,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let subscribe_request = SubscribeRequest {
|
||||
accounts: subscribe_accounts,
|
||||
slots: Default::default(),
|
||||
transactions: Default::default(),
|
||||
blocks: Default::default(),
|
||||
blocks_meta: Default::default(),
|
||||
entry: Default::default(),
|
||||
commitment: Some(processed_commitment.into()),
|
||||
accounts_data_slice: Default::default(),
|
||||
ping: None,
|
||||
};
|
||||
|
||||
log::info!(
|
||||
"Accounts on demand subscribing to {}",
|
||||
grpc_config.grpc_addr
|
||||
);
|
||||
let Ok(mut client) = yellowstone_grpc_client::GeyserGrpcClient::connect(
|
||||
grpc_config.grpc_addr.clone(),
|
||||
grpc_config.grpc_x_token.clone(),
|
||||
None,
|
||||
) else {
|
||||
// problem connecting to grpc, retry after a sec
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let Ok(mut account_stream) = client.subscribe_once2(subscribe_request).await else {
|
||||
// problem subscribing to geyser stream, retry after a sec
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
while let Some(message) = account_stream.next().await {
|
||||
let message = message.unwrap();
|
||||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
|
||||
has_started.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
match update {
|
||||
UpdateOneof::Account(account) => {
|
||||
if let Some(account_data) = account.account {
|
||||
let account_pk_bytes: [u8; 32] = account_data
|
||||
.pubkey
|
||||
.try_into()
|
||||
.expect("Pubkey should be 32 byte long");
|
||||
let owner: [u8; 32] = account_data
|
||||
.owner
|
||||
.try_into()
|
||||
.expect("owner pubkey should be deserializable");
|
||||
let notification = AccountNotificationMessage {
|
||||
data: AccountData {
|
||||
pubkey: Pubkey::new_from_array(account_pk_bytes),
|
||||
account: Arc::new(Account {
|
||||
lamports: account_data.lamports,
|
||||
data: account_data.data,
|
||||
owner: Pubkey::new_from_array(owner),
|
||||
executable: account_data.executable,
|
||||
rent_epoch: account_data.rent_epoch,
|
||||
}),
|
||||
updated_slot: account.slot,
|
||||
},
|
||||
// TODO update with processed commitment / check above
|
||||
commitment: Commitment::Processed,
|
||||
};
|
||||
if account_stream_sx.send(notification).is_err() {
|
||||
// non recoverable, i.e the whole stream is being restarted
|
||||
log::error!("Account stream broken, breaking from main loop");
|
||||
break 'main_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
UpdateOneof::Ping(_) => {
|
||||
log::trace!("GRPC Ping accounts stream");
|
||||
}
|
||||
_ => {
|
||||
log::error!("GRPC accounts steam misconfigured");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_grpc_account_streaming_tasks(
|
||||
grpc_sources: Vec<GrpcSourceConfig>,
|
||||
mut account_filter_watch: watch::Receiver<AccountFilters>,
|
||||
) -> (AnyhowJoinHandle, AccountStream) {
|
||||
let (account_sender, accounts_stream) = broadcast::channel::<AccountNotificationMessage>(128);
|
||||
|
||||
let jh: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
match account_filter_watch.changed().await {
|
||||
Ok(_) => {
|
||||
// do nothing
|
||||
}
|
||||
Err(e) => {
|
||||
log::error!("account filter watch failed with error {}", e);
|
||||
anyhow::bail!("Accounts on demand task failed");
|
||||
}
|
||||
}
|
||||
let accounts_filters = account_filter_watch.borrow_and_update().clone();
|
||||
|
||||
let has_started = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let mut current_tasks = grpc_sources
|
||||
.iter()
|
||||
.map(|grpc_config| {
|
||||
start_account_streaming_task(
|
||||
grpc_config.clone(),
|
||||
accounts_filters.clone(),
|
||||
account_sender.clone(),
|
||||
has_started.clone(),
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
'check_watch: while account_filter_watch.changed().await.is_ok() {
|
||||
// wait for a second to get all the accounts to update
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
let accounts_filters = account_filter_watch.borrow_and_update().clone();
|
||||
|
||||
let has_started_new = Arc::new(AtomicBool::new(false));
|
||||
let elapsed_restart = tokio::time::Instant::now();
|
||||
|
||||
let new_tasks = grpc_sources
|
||||
.iter()
|
||||
.map(|grpc_config| {
|
||||
start_account_streaming_task(
|
||||
grpc_config.clone(),
|
||||
accounts_filters.clone(),
|
||||
account_sender.clone(),
|
||||
has_started_new.clone(),
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
while !has_started_new.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
if elapsed_restart.elapsed() > Duration::from_secs(60) {
|
||||
// check if time elapsed during restart is greater than 60ms
|
||||
log::error!("Tried to restart the accounts on demand task but failed");
|
||||
new_tasks.iter().for_each(|x| x.abort());
|
||||
continue 'check_watch;
|
||||
}
|
||||
}
|
||||
|
||||
// abort previous tasks
|
||||
current_tasks.iter().for_each(|x| x.abort());
|
||||
|
||||
current_tasks = new_tasks;
|
||||
}
|
||||
log::error!("Accounts on demand task stopped");
|
||||
anyhow::bail!("Accounts on demand task stopped");
|
||||
});
|
||||
|
||||
(jh, accounts_stream)
|
||||
}
|
|
@ -29,8 +29,10 @@ pub struct AccountService {
|
|||
}
|
||||
|
||||
impl AccountService {
|
||||
pub fn new(account_store: Arc<dyn AccountStorageInterface>) -> Self {
|
||||
let (account_notification_sender, _) = tokio::sync::broadcast::channel(256);
|
||||
pub fn new(
|
||||
account_store: Arc<dyn AccountStorageInterface>,
|
||||
account_notification_sender: Sender<AccountNotificationMessage>,
|
||||
) -> Self {
|
||||
Self {
|
||||
account_store,
|
||||
account_notification_sender,
|
||||
|
@ -116,9 +118,9 @@ impl AccountService {
|
|||
for (index, account) in fetch_accounts.iter().enumerate() {
|
||||
if let Some(account) = account {
|
||||
self.account_store
|
||||
.initilize_account(AccountData {
|
||||
.initilize_or_update_account(AccountData {
|
||||
pubkey: accounts[index],
|
||||
account: account.clone(),
|
||||
account: Arc::new(account.clone()),
|
||||
updated_slot,
|
||||
})
|
||||
.await;
|
||||
|
@ -140,13 +142,16 @@ impl AccountService {
|
|||
loop {
|
||||
match account_stream.recv().await {
|
||||
Ok(account_notification) => {
|
||||
this.account_store
|
||||
if this
|
||||
.account_store
|
||||
.update_account(
|
||||
account_notification.data.clone(),
|
||||
account_notification.commitment,
|
||||
)
|
||||
.await;
|
||||
let _ = this.account_notification_sender.send(account_notification);
|
||||
.await
|
||||
{
|
||||
let _ = this.account_notification_sender.send(account_notification);
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(e)) => {
|
||||
log::error!(
|
||||
|
@ -183,11 +188,11 @@ impl AccountService {
|
|||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(e)) => {
|
||||
log::error!("Account Stream Lagged by {}", e);
|
||||
log::error!("Block Stream Lagged to update accounts by {}", e);
|
||||
continue;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
log::error!("Account Stream Broken");
|
||||
log::error!("Block Stream Broken");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -210,7 +215,7 @@ impl AccountService {
|
|||
let data_slice = config.as_ref().map(|c| c.data_slice).unwrap_or_default();
|
||||
UiAccount::encode(
|
||||
&account_data.pubkey,
|
||||
&account_data.account,
|
||||
account_data.account.as_ref(),
|
||||
encoding,
|
||||
None,
|
||||
data_slice,
|
||||
|
@ -230,15 +235,14 @@ impl AccountService {
|
|||
let commitment = Commitment::from(commitment);
|
||||
|
||||
if let Some(account_data) = self.account_store.get_account(account, commitment).await {
|
||||
let ui_account =
|
||||
Self::convert_account_data_to_ui_account(&account_data, config.clone());
|
||||
|
||||
// if minimum context slot is not satisfied return Null
|
||||
let minimum_context_slot = config
|
||||
.as_ref()
|
||||
.map(|c| c.min_context_slot.unwrap_or_default())
|
||||
.unwrap_or_default();
|
||||
if minimum_context_slot <= account_data.updated_slot {
|
||||
let ui_account =
|
||||
Self::convert_account_data_to_ui_account(&account_data, config.clone());
|
||||
Ok((account_data.updated_slot, Some(ui_account)))
|
||||
} else {
|
||||
Ok((account_data.updated_slot, None))
|
||||
|
|
|
@ -7,9 +7,10 @@ use solana_sdk::slot_history::Slot;
|
|||
|
||||
#[async_trait]
|
||||
pub trait AccountStorageInterface: Send + Sync {
|
||||
async fn update_account(&self, account_data: AccountData, commitment: Commitment);
|
||||
// Update account and return true if the account was sucessfylly updated
|
||||
async fn update_account(&self, account_data: AccountData, commitment: Commitment) -> bool;
|
||||
|
||||
async fn initilize_account(&self, account_data: AccountData);
|
||||
async fn initilize_or_update_account(&self, account_data: AccountData);
|
||||
|
||||
async fn get_account(&self, account_pk: Pubkey, commitment: Commitment) -> Option<AccountData>;
|
||||
|
||||
|
|
|
@ -1,21 +1,18 @@
|
|||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use crate::account_store_interface::AccountStorageInterface;
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use itertools::Itertools;
|
||||
use solana_lite_rpc_core::{commitment_utils::Commitment, structures::account_data::AccountData};
|
||||
use solana_rpc_client_api::filter::RpcFilterType;
|
||||
use solana_sdk::{pubkey::Pubkey, slot_history::Slot};
|
||||
use std::collections::BTreeMap;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::account_store_interface::AccountStorageInterface;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Default)]
|
||||
pub struct AccountDataByCommitment {
|
||||
pub pubkey: Pubkey,
|
||||
// should have maximum 32 entries, all processed slots which are not yet finalized
|
||||
pub processed_accounts: BTreeMap<Slot, AccountData>,
|
||||
pub confirmed_account: Option<AccountData>,
|
||||
pub finalized_account: Option<AccountData>,
|
||||
|
@ -35,24 +32,35 @@ impl AccountDataByCommitment {
|
|||
}
|
||||
}
|
||||
|
||||
// Should be used when accounts is created by geyser notification
|
||||
pub fn new(data: AccountData, commitment: Commitment) -> Self {
|
||||
let mut processed_accounts = BTreeMap::new();
|
||||
processed_accounts.insert(data.updated_slot, data.clone());
|
||||
AccountDataByCommitment {
|
||||
pubkey: data.pubkey,
|
||||
processed_accounts,
|
||||
confirmed_account: if commitment == Commitment::Confirmed {
|
||||
confirmed_account: if commitment == Commitment::Confirmed
|
||||
|| commitment == Commitment::Finalized
|
||||
{
|
||||
Some(data.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
finalized_account: if commitment == Commitment::Finalized {
|
||||
Some(data)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
finalized_account: None,
|
||||
}
|
||||
}
|
||||
|
||||
// should be called with finalized accounts data
|
||||
// when accounts storage is being warmed up
|
||||
pub fn initialize(data: AccountData) -> Self {
|
||||
let mut processed_accounts = BTreeMap::new();
|
||||
processed_accounts.insert(data.updated_slot, data.clone());
|
||||
AccountDataByCommitment {
|
||||
pubkey: data.pubkey,
|
||||
processed_accounts,
|
||||
confirmed_account: Some(data.clone()),
|
||||
finalized_account: Some(data),
|
||||
|
@ -74,12 +82,14 @@ impl AccountDataByCommitment {
|
|||
.map(|x| x.updated_slot < data.updated_slot)
|
||||
.unwrap_or(true);
|
||||
|
||||
let mut updated = false;
|
||||
if self.processed_accounts.get(&data.updated_slot).is_none() {
|
||||
// processed not present for the slot
|
||||
self.processed_accounts
|
||||
.insert(data.updated_slot, data.clone());
|
||||
updated = true;
|
||||
}
|
||||
let mut updated = false;
|
||||
|
||||
match commitment {
|
||||
Commitment::Confirmed => {
|
||||
if update_confirmed {
|
||||
|
@ -90,6 +100,7 @@ impl AccountDataByCommitment {
|
|||
Commitment::Finalized => {
|
||||
if update_confirmed {
|
||||
self.confirmed_account = Some(data.clone());
|
||||
updated = true;
|
||||
}
|
||||
if update_finalized {
|
||||
self.finalized_account = Some(data);
|
||||
|
@ -132,24 +143,17 @@ impl AccountDataByCommitment {
|
|||
}
|
||||
Commitment::Finalized => {
|
||||
// slot finalized remove data from processed
|
||||
while self
|
||||
.processed_accounts
|
||||
.first_key_value()
|
||||
.map(|(s, _)| *s)
|
||||
.unwrap_or(u64::MAX)
|
||||
<= slot
|
||||
while self.processed_accounts.len() > 1
|
||||
&& self
|
||||
.processed_accounts
|
||||
.first_key_value()
|
||||
.map(|(s, _)| *s)
|
||||
.unwrap_or(u64::MAX)
|
||||
<= slot
|
||||
{
|
||||
self.processed_accounts.pop_first();
|
||||
}
|
||||
|
||||
// processed map should not be empty
|
||||
if self.processed_accounts.is_empty() {
|
||||
log::error!(
|
||||
"Processed map should not be empty filling it with finalized data"
|
||||
);
|
||||
self.processed_accounts.insert(slot, account_data.clone());
|
||||
}
|
||||
|
||||
if self
|
||||
.finalized_account
|
||||
.as_ref()
|
||||
|
@ -165,17 +169,21 @@ impl AccountDataByCommitment {
|
|||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
} else if commitment == Commitment::Finalized {
|
||||
// remove processed slot data
|
||||
while self
|
||||
.processed_accounts
|
||||
.first_key_value()
|
||||
.map(|(s, _)| *s)
|
||||
.unwrap_or(u64::MAX)
|
||||
<= slot
|
||||
while self.processed_accounts.len() > 1
|
||||
&& self
|
||||
.processed_accounts
|
||||
.first_key_value()
|
||||
.map(|(s, _)| *s)
|
||||
.unwrap_or(u64::MAX)
|
||||
<= slot
|
||||
{
|
||||
self.processed_accounts.pop_first();
|
||||
}
|
||||
|
||||
None
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
@ -183,21 +191,21 @@ impl AccountDataByCommitment {
|
|||
|
||||
pub struct InmemoryAccountStore {
|
||||
account_store: Arc<DashMap<Pubkey, AccountDataByCommitment>>,
|
||||
confirmed_slots_map: RwLock<BTreeSet<Slot>>,
|
||||
owner_map_accounts: Arc<DashMap<Pubkey, HashSet<Pubkey>>>,
|
||||
confirmed_slots_map: DashSet<Slot>,
|
||||
accounts_by_owner: Arc<DashMap<Pubkey, HashSet<Pubkey>>>,
|
||||
}
|
||||
|
||||
impl InmemoryAccountStore {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
account_store: Arc::new(DashMap::new()),
|
||||
confirmed_slots_map: RwLock::new(BTreeSet::new()),
|
||||
owner_map_accounts: Arc::new(DashMap::new()),
|
||||
confirmed_slots_map: DashSet::new(),
|
||||
accounts_by_owner: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_account_owner(&self, account: Pubkey, owner: Pubkey) {
|
||||
match self.owner_map_accounts.entry(owner) {
|
||||
match self.accounts_by_owner.entry(owner) {
|
||||
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
|
||||
occ.get_mut().insert(account);
|
||||
}
|
||||
|
@ -217,12 +225,11 @@ impl InmemoryAccountStore {
|
|||
new_account_data: &AccountData,
|
||||
commitment: Commitment,
|
||||
) {
|
||||
if prev_account_data.pubkey == new_account_data.pubkey
|
||||
&& prev_account_data.account.owner != new_account_data.account.owner
|
||||
{
|
||||
assert_eq!(prev_account_data.pubkey, new_account_data.pubkey);
|
||||
if prev_account_data.account.owner != new_account_data.account.owner {
|
||||
if commitment == Commitment::Finalized {
|
||||
match self
|
||||
.owner_map_accounts
|
||||
.accounts_by_owner
|
||||
.entry(prev_account_data.account.owner)
|
||||
{
|
||||
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
|
||||
|
@ -240,12 +247,11 @@ impl InmemoryAccountStore {
|
|||
|
||||
#[async_trait]
|
||||
impl AccountStorageInterface for InmemoryAccountStore {
|
||||
async fn update_account(&self, account_data: AccountData, commitment: Commitment) {
|
||||
async fn update_account(&self, account_data: AccountData, commitment: Commitment) -> bool {
|
||||
let slot = account_data.updated_slot;
|
||||
// check if the blockhash and slot is already confirmed
|
||||
let commitment = if commitment == Commitment::Processed {
|
||||
let lk = self.confirmed_slots_map.read().await;
|
||||
if lk.contains(&slot) {
|
||||
if self.confirmed_slots_map.contains(&slot) {
|
||||
Commitment::Confirmed
|
||||
} else {
|
||||
Commitment::Processed
|
||||
|
@ -257,10 +263,16 @@ impl AccountStorageInterface for InmemoryAccountStore {
|
|||
match self.account_store.entry(account_data.pubkey) {
|
||||
dashmap::mapref::entry::Entry::Occupied(mut occ) => {
|
||||
let prev_account = occ.get().get_account_data(commitment);
|
||||
if let Some(prev_account) = prev_account {
|
||||
self.update_owner(&prev_account, &account_data, commitment);
|
||||
|
||||
// if account has been updated
|
||||
if occ.get_mut().update(account_data.clone(), commitment) {
|
||||
if let Some(prev_account) = prev_account {
|
||||
self.update_owner(&prev_account, &account_data, commitment);
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
occ.get_mut().update(account_data, commitment);
|
||||
}
|
||||
dashmap::mapref::entry::Entry::Vacant(vac) => {
|
||||
self.add_account_owner(account_data.pubkey, account_data.account.owner);
|
||||
|
@ -268,11 +280,12 @@ impl AccountStorageInterface for InmemoryAccountStore {
|
|||
account_data.clone(),
|
||||
commitment,
|
||||
));
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn initilize_account(&self, account_data: AccountData) {
|
||||
async fn initilize_or_update_account(&self, account_data: AccountData) {
|
||||
match self.account_store.contains_key(&account_data.pubkey) {
|
||||
true => {
|
||||
// account has already been filled by an event
|
||||
|
@ -303,7 +316,7 @@ impl AccountStorageInterface for InmemoryAccountStore {
|
|||
account_filters: Option<Vec<RpcFilterType>>,
|
||||
commitment: Commitment,
|
||||
) -> Option<Vec<AccountData>> {
|
||||
if let Some(program_accounts) = self.owner_map_accounts.get(&program_pubkey) {
|
||||
if let Some(program_accounts) = self.accounts_by_owner.get(&program_pubkey) {
|
||||
let mut return_vec = vec![];
|
||||
for program_account in program_accounts.iter() {
|
||||
let account_data = self.get_account(*program_account, commitment).await;
|
||||
|
@ -334,15 +347,13 @@ impl AccountStorageInterface for InmemoryAccountStore {
|
|||
Commitment::Confirmed => {
|
||||
// insert slot and blockhash that were confirmed
|
||||
{
|
||||
let mut lk = self.confirmed_slots_map.write().await;
|
||||
lk.insert(slot);
|
||||
self.confirmed_slots_map.insert(slot);
|
||||
}
|
||||
}
|
||||
Commitment::Finalized => {
|
||||
// remove finalized slots form confirmed map
|
||||
{
|
||||
let mut lk = self.confirmed_slots_map.write().await;
|
||||
if !lk.remove(&slot) {
|
||||
if self.confirmed_slots_map.remove(&slot).is_none() {
|
||||
log::warn!(
|
||||
"following slot {} were not confirmed by account storage",
|
||||
slot
|
||||
|
@ -356,7 +367,6 @@ impl AccountStorageInterface for InmemoryAccountStore {
|
|||
return vec![];
|
||||
}
|
||||
}
|
||||
|
||||
let updated_accounts = self
|
||||
.account_store
|
||||
.iter_mut()
|
||||
|
@ -400,6 +410,8 @@ impl Default for InmemoryAccountStore {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use itertools::Itertools;
|
||||
use rand::{rngs::ThreadRng, Rng};
|
||||
use solana_lite_rpc_core::{
|
||||
|
@ -421,13 +433,13 @@ mod tests {
|
|||
let length: usize = rng.gen_range(100..1000);
|
||||
AccountData {
|
||||
pubkey,
|
||||
account: Account {
|
||||
account: Arc::new(Account {
|
||||
lamports: rng.gen(),
|
||||
data: (0..length).map(|_| rng.gen::<u8>()).collect_vec(),
|
||||
owner: program,
|
||||
executable: false,
|
||||
rent_epoch: 0,
|
||||
},
|
||||
}),
|
||||
updated_slot,
|
||||
}
|
||||
}
|
||||
|
@ -441,10 +453,14 @@ mod tests {
|
|||
let pk2 = Pubkey::new_unique();
|
||||
|
||||
let account_data_0 = create_random_account(&mut rng, 0, pk1, program);
|
||||
store.initilize_account(account_data_0.clone()).await;
|
||||
store
|
||||
.initilize_or_update_account(account_data_0.clone())
|
||||
.await;
|
||||
|
||||
let account_data_1 = create_random_account(&mut rng, 0, pk2, program);
|
||||
store.initilize_account(account_data_1.clone()).await;
|
||||
store
|
||||
.initilize_or_update_account(account_data_1.clone())
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
store.get_account(pk1, Commitment::Processed).await,
|
||||
|
@ -559,7 +575,7 @@ mod tests {
|
|||
let mut rng = rand::thread_rng();
|
||||
|
||||
store
|
||||
.initilize_account(create_random_account(&mut rng, 0, pk1, program))
|
||||
.initilize_or_update_account(create_random_account(&mut rng, 0, pk1, program))
|
||||
.await;
|
||||
|
||||
store
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
use futures::StreamExt;
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use geyser_grpc_connector::GrpcSourceConfig;
|
||||
use geyser_grpc_connector::Message::GeyserSubscribeUpdate;
|
||||
use itertools::Itertools;
|
||||
use solana_lite_rpc_core::{
|
||||
commitment_utils::Commitment,
|
||||
|
@ -28,10 +27,7 @@ pub fn start_account_streaming_tasks(
|
|||
) -> AnyhowJoinHandle {
|
||||
tokio::spawn(async move {
|
||||
'main_loop: loop {
|
||||
// for now we can only be sure that there is one confirmed block per slot, for processed there can be multiple confirmed blocks
|
||||
// So setting commitment to confirmed
|
||||
// To do somehow make it processed, we we could get blockhash with slot it should be ideal
|
||||
let confirmed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Confirmed;
|
||||
let processed_commitment = yellowstone_grpc_proto::geyser::CommitmentLevel::Processed;
|
||||
|
||||
let mut subscribe_accounts: HashMap<String, SubscribeRequestFilterAccounts> =
|
||||
HashMap::new();
|
||||
|
@ -81,6 +77,11 @@ pub fn start_account_streaming_tasks(
|
|||
)),
|
||||
}
|
||||
}
|
||||
AccountFilterType::TokenAccountState => {
|
||||
SubscribeRequestFilterAccountsFilter {
|
||||
filter: Some(Filter::TokenAccountState(false)),
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect_vec()
|
||||
} else {
|
||||
|
@ -104,14 +105,21 @@ pub fn start_account_streaming_tasks(
|
|||
blocks: Default::default(),
|
||||
blocks_meta: Default::default(),
|
||||
entry: Default::default(),
|
||||
commitment: Some(confirmed_commitment.into()),
|
||||
commitment: Some(processed_commitment.into()),
|
||||
accounts_data_slice: Default::default(),
|
||||
ping: None,
|
||||
};
|
||||
let (_abort_handler, mut accounts_stream) =
|
||||
create_geyser_autoconnection_task(grpc_config.clone(), subscribe_request);
|
||||
|
||||
while let Some(GeyserSubscribeUpdate(message)) = accounts_stream.recv().await {
|
||||
let mut client = yellowstone_grpc_client::GeyserGrpcClient::connect(
|
||||
grpc_config.grpc_addr.clone(),
|
||||
grpc_config.grpc_x_token.clone(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let mut account_stream = client.subscribe_once2(subscribe_request).await.unwrap();
|
||||
|
||||
while let Some(message) = account_stream.next().await {
|
||||
let message = message.unwrap();
|
||||
let Some(update) = message.update_oneof else {
|
||||
continue;
|
||||
};
|
||||
|
@ -130,17 +138,17 @@ pub fn start_account_streaming_tasks(
|
|||
let notification = AccountNotificationMessage {
|
||||
data: AccountData {
|
||||
pubkey: Pubkey::new_from_array(account_pk_bytes),
|
||||
account: Account {
|
||||
account: Arc::new(Account {
|
||||
lamports: account_data.lamports,
|
||||
data: account_data.data,
|
||||
owner: Pubkey::new_from_array(owner),
|
||||
executable: account_data.executable,
|
||||
rent_epoch: account_data.rent_epoch,
|
||||
},
|
||||
}),
|
||||
updated_slot: account.slot,
|
||||
},
|
||||
// TODO update with processed commitment / check above
|
||||
commitment: Commitment::Confirmed,
|
||||
commitment: Commitment::Processed,
|
||||
};
|
||||
if account_stream_sx.send(notification).is_err() {
|
||||
// non recoverable, i.e the whole stream is being restarted
|
||||
|
@ -168,12 +176,12 @@ pub fn create_grpc_account_streaming(
|
|||
grpc_sources: Vec<GrpcSourceConfig>,
|
||||
accounts_filters: AccountFilters,
|
||||
) -> (AnyhowJoinHandle, AccountStream) {
|
||||
let (account_sender, accounts_stream) = broadcast::channel::<AccountNotificationMessage>(128);
|
||||
let (account_sender, accounts_stream) = broadcast::channel::<AccountNotificationMessage>(1024);
|
||||
|
||||
let jh: AnyhowJoinHandle = tokio::spawn(async move {
|
||||
loop {
|
||||
let (accounts_sx, mut accounts_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
grpc_sources
|
||||
let jhs = grpc_sources
|
||||
.iter()
|
||||
.map(|grpc_config| {
|
||||
start_account_streaming_tasks(
|
||||
|
@ -200,6 +208,11 @@ pub fn create_grpc_account_streaming(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
for jh in jhs {
|
||||
// abort previous handles
|
||||
jh.abort();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use solana_rpc_client_api::filter::RpcFilterType;
|
||||
use solana_sdk::{account::Account, pubkey::Pubkey, slot_history::Slot};
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use crate::commitment_utils::Commitment;
|
||||
|
||||
#[derive(Clone, Eq, PartialEq, Debug)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AccountData {
|
||||
pub pubkey: Pubkey,
|
||||
pub account: Account,
|
||||
pub account: Arc<Account>,
|
||||
pub updated_slot: Slot,
|
||||
}
|
||||
|
||||
|
@ -24,6 +26,14 @@ impl AccountData {
|
|||
}
|
||||
}
|
||||
|
||||
impl PartialEq for AccountData {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.pubkey == other.pubkey
|
||||
&& *self.account == *other.account
|
||||
&& self.updated_slot == other.updated_slot
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AccountNotificationMessage {
|
||||
pub data: AccountData,
|
||||
|
|
|
@ -22,6 +22,7 @@ pub struct MemcmpFilter {
|
|||
pub enum AccountFilterType {
|
||||
Datasize(u64),
|
||||
Memcmp(MemcmpFilter),
|
||||
TokenAccountState,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
|
||||
|
@ -53,12 +54,31 @@ impl AccountFilter {
|
|||
};
|
||||
RpcFilterType::Memcmp(RpcMemcmp::new(memcpy.offset as usize, encoded_bytes))
|
||||
}
|
||||
AccountFilterType::TokenAccountState => RpcFilterType::TokenAccountState,
|
||||
})
|
||||
.collect_vec()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(deprecated)]
|
||||
impl From<&RpcFilterType> for AccountFilterType {
|
||||
fn from(value: &RpcFilterType) -> Self {
|
||||
match value {
|
||||
RpcFilterType::DataSize(size) => AccountFilterType::Datasize(*size),
|
||||
RpcFilterType::Memcmp(memcmp) => {
|
||||
let bytes = memcmp.bytes().map(|x| (*x).clone()).unwrap_or_default();
|
||||
let offset = memcmp.offset as u64;
|
||||
AccountFilterType::Memcmp(MemcmpFilter {
|
||||
offset,
|
||||
data: MemcmpFilterData::Bytes(bytes),
|
||||
})
|
||||
}
|
||||
RpcFilterType::TokenAccountState => AccountFilterType::TokenAccountState,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type AccountFilters = Vec<AccountFilter>;
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -59,6 +59,7 @@ solana-lite-rpc-blockstore = { workspace = true }
|
|||
solana-lite-rpc-prioritization-fees = { workspace = true }
|
||||
solana-lite-rpc-address-lookup-tables = { workspace = true }
|
||||
solana-lite-rpc-accounts = { workspace = true }
|
||||
solana-lite-rpc-accounts-on-demand = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
bench = { path = "../bench" }
|
||||
|
|
|
@ -563,14 +563,11 @@ impl LiteRpcServer for LiteBridge {
|
|||
ui_accounts.push(ui_account);
|
||||
}
|
||||
Err(_) => {
|
||||
// internal error while fetching multiple accounts
|
||||
return Err(jsonrpsee::types::error::ErrorCode::ServerError(
|
||||
RpcErrors::AccountNotFound as i32,
|
||||
)
|
||||
.into());
|
||||
ui_accounts.push(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
assert_eq!(ui_accounts.len(), pubkey_strs.len());
|
||||
Ok(RpcResponse {
|
||||
context: RpcResponseContext {
|
||||
slot: max_slot,
|
||||
|
@ -617,4 +614,42 @@ impl LiteRpcServer for LiteBridge {
|
|||
Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into())
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_balance(
|
||||
&self,
|
||||
pubkey_str: String,
|
||||
config: Option<RpcContextConfig>,
|
||||
) -> RpcResult<RpcResponse<u64>> {
|
||||
let Ok(pubkey) = Pubkey::from_str(&pubkey_str) else {
|
||||
// pubkey is invalid
|
||||
return Err(jsonrpsee::types::error::ErrorCode::InvalidParams.into());
|
||||
};
|
||||
let config = config.map(|x| RpcAccountInfoConfig {
|
||||
encoding: None,
|
||||
data_slice: None,
|
||||
commitment: x.commitment,
|
||||
min_context_slot: x.min_context_slot,
|
||||
});
|
||||
if let Some(account_service) = &self.accounts_service {
|
||||
match account_service.get_account(pubkey, config).await {
|
||||
Ok((slot, ui_account)) => Ok(RpcResponse {
|
||||
context: RpcResponseContext {
|
||||
slot,
|
||||
api_version: None,
|
||||
},
|
||||
value: ui_account.map(|x| x.lamports).unwrap_or_default(),
|
||||
}),
|
||||
Err(_) => {
|
||||
// account not found
|
||||
Err(jsonrpsee::types::error::ErrorCode::ServerError(
|
||||
RpcErrors::AccountNotFound as i32,
|
||||
)
|
||||
.into())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// accounts are disabled
|
||||
Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use solana_lite_rpc_core::{
|
|||
commitment_utils::Commitment, stores::data_cache::DataCache,
|
||||
structures::account_data::AccountNotificationMessage, types::BlockStream,
|
||||
};
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
use std::{str::FromStr, sync::Arc, time::Duration};
|
||||
use tokio::sync::broadcast::error::RecvError::{Closed, Lagged};
|
||||
|
||||
use crate::{
|
||||
|
@ -236,19 +236,20 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
RPC_BLOCK_PRIOFEES_SUBSCRIBE.inc();
|
||||
|
||||
'recv_loop: loop {
|
||||
match account_fees_stream.recv().await {
|
||||
Ok(AccountPrioFeesUpdateMessage {
|
||||
match tokio::time::timeout(Duration::from_secs(1), account_fees_stream.recv()).await
|
||||
{
|
||||
Ok(Ok(AccountPrioFeesUpdateMessage {
|
||||
slot,
|
||||
accounts_data,
|
||||
}) => {
|
||||
if let Some(account_data) = accounts_data.get(&account) {
|
||||
})) => {
|
||||
if let Some(account_stats) = accounts_data.get(&account) {
|
||||
let result_message =
|
||||
jsonrpsee::SubscriptionMessage::from_json(&RpcResponse {
|
||||
context: RpcResponseContext {
|
||||
slot,
|
||||
api_version: None,
|
||||
},
|
||||
value: account_data,
|
||||
value: account_stats,
|
||||
});
|
||||
|
||||
match sink.send(result_message.unwrap()).await {
|
||||
|
@ -263,7 +264,7 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
};
|
||||
}
|
||||
}
|
||||
Err(Lagged(lagged)) => {
|
||||
Ok(Err(Lagged(lagged))) => {
|
||||
// this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging
|
||||
log::warn!(
|
||||
"subscriber laggs some({}) priofees update messages - continue",
|
||||
|
@ -271,10 +272,16 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
);
|
||||
continue 'recv_loop;
|
||||
}
|
||||
Err(Closed) => {
|
||||
Ok(Err(Closed)) => {
|
||||
log::error!("failed to receive block, sender closed - aborting");
|
||||
return;
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
// check if subscription is closed
|
||||
if sink.is_closed() {
|
||||
break 'recv_loop;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -297,6 +304,11 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
"Accounts service not configured".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let account_config = config.clone().unwrap_or_default();
|
||||
let config_commitment = account_config.commitment.unwrap_or_default();
|
||||
let min_context_slot = account_config.min_context_slot.unwrap_or_default();
|
||||
|
||||
let sink = pending.accept().await?;
|
||||
let mut accounts_stream = accounts_service.account_notification_sender.subscribe();
|
||||
|
||||
|
@ -304,8 +316,8 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
RPC_ACCOUNT_SUBSCRIBE.inc();
|
||||
|
||||
loop {
|
||||
match accounts_stream.recv().await {
|
||||
Ok(AccountNotificationMessage { data, commitment }) => {
|
||||
match tokio::time::timeout(Duration::from_secs(1), accounts_stream.recv()).await {
|
||||
Ok(Ok(AccountNotificationMessage { data, commitment })) => {
|
||||
if sink.is_closed() {
|
||||
// sink is already closed
|
||||
return;
|
||||
|
@ -315,9 +327,6 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
// notification is different account
|
||||
continue;
|
||||
}
|
||||
let account_config = config.clone().unwrap_or_default();
|
||||
let config_commitment = account_config.commitment.unwrap_or_default();
|
||||
let min_context_slot = account_config.min_context_slot.unwrap_or_default();
|
||||
// check config
|
||||
// check if commitment match
|
||||
if Commitment::from(config_commitment) != commitment {
|
||||
|
@ -351,7 +360,7 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
}
|
||||
};
|
||||
}
|
||||
Err(Lagged(lagged)) => {
|
||||
Ok(Err(Lagged(lagged))) => {
|
||||
// this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging
|
||||
log::warn!(
|
||||
"subscriber laggs some({}) accounts messages - continue",
|
||||
|
@ -359,12 +368,18 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
);
|
||||
continue;
|
||||
}
|
||||
Err(Closed) => {
|
||||
Ok(Err(Closed)) => {
|
||||
log::error!(
|
||||
"failed to receive account notifications, sender closed - aborting"
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
// on timeout check if sink is still open
|
||||
if sink.is_closed() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -390,12 +405,19 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
let sink = pending.accept().await?;
|
||||
let mut accounts_stream = accounts_service.account_notification_sender.subscribe();
|
||||
|
||||
let program_config = config.clone().unwrap_or_default();
|
||||
let config_commitment = program_config.account_config.commitment.unwrap_or_default();
|
||||
let min_context_slot = program_config
|
||||
.account_config
|
||||
.min_context_slot
|
||||
.unwrap_or_default();
|
||||
|
||||
tokio::spawn(async move {
|
||||
RPC_ACCOUNT_SUBSCRIBE.inc();
|
||||
|
||||
loop {
|
||||
match accounts_stream.recv().await {
|
||||
Ok(AccountNotificationMessage { data, commitment }) => {
|
||||
match tokio::time::timeout(Duration::from_secs(1), accounts_stream.recv()).await {
|
||||
Ok(Ok(AccountNotificationMessage { data, commitment })) => {
|
||||
if sink.is_closed() {
|
||||
// sink is already closed
|
||||
return;
|
||||
|
@ -404,14 +426,6 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
// wrong program owner
|
||||
continue;
|
||||
}
|
||||
|
||||
let program_config = config.clone().unwrap_or_default();
|
||||
let config_commitment =
|
||||
program_config.account_config.commitment.unwrap_or_default();
|
||||
let min_context_slot = program_config
|
||||
.account_config
|
||||
.min_context_slot
|
||||
.unwrap_or_default();
|
||||
// check config
|
||||
// check if commitment match
|
||||
if Commitment::from(config_commitment) != commitment {
|
||||
|
@ -422,7 +436,7 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
continue;
|
||||
}
|
||||
// check filters
|
||||
if let Some(filters) = program_config.filters {
|
||||
if let Some(filters) = &program_config.filters {
|
||||
if filters.iter().any(|filter| !data.allows(filter)) {
|
||||
// filters not stasfied
|
||||
continue;
|
||||
|
@ -445,7 +459,6 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
},
|
||||
value,
|
||||
});
|
||||
|
||||
match sink.send(result_message.unwrap()).await {
|
||||
Ok(()) => {
|
||||
// success
|
||||
|
@ -457,7 +470,7 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
}
|
||||
};
|
||||
}
|
||||
Err(Lagged(lagged)) => {
|
||||
Ok(Err(Lagged(lagged))) => {
|
||||
// this usually happens if there is one "slow receiver", see https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html#lagging
|
||||
log::warn!(
|
||||
"subscriber laggs some({}) program accounts messages - continue",
|
||||
|
@ -465,12 +478,18 @@ impl LiteRpcPubSubServer for LitePubSubBridge {
|
|||
);
|
||||
continue;
|
||||
}
|
||||
Err(Closed) => {
|
||||
Ok(Err(Closed)) => {
|
||||
log::error!(
|
||||
"failed to receive account notifications, sender closed - aborting"
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(_elapsed) => {
|
||||
// on timeout check if sink is still open
|
||||
if sink.is_closed() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -87,6 +87,9 @@ pub struct Config {
|
|||
|
||||
#[serde(default)]
|
||||
pub account_filters: Option<String>,
|
||||
|
||||
#[serde(default)]
|
||||
pub enable_accounts_on_demand_accounts_service: bool,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
|
|
|
@ -3,6 +3,7 @@ pub mod rpc_tester;
|
|||
use crate::rpc_tester::RpcTester;
|
||||
use anyhow::bail;
|
||||
use dashmap::DashMap;
|
||||
use itertools::Itertools;
|
||||
use lite_rpc::bridge::LiteBridge;
|
||||
use lite_rpc::bridge_pubsub::LitePubSubBridge;
|
||||
use lite_rpc::cli::Config;
|
||||
|
@ -14,6 +15,7 @@ use log::{debug, info};
|
|||
use solana_lite_rpc_accounts::account_service::AccountService;
|
||||
use solana_lite_rpc_accounts::account_store_interface::AccountStorageInterface;
|
||||
use solana_lite_rpc_accounts::inmemory_account_store::InmemoryAccountStore;
|
||||
use solana_lite_rpc_accounts_on_demand::accounts_on_demand::AccountsOnDemand;
|
||||
use solana_lite_rpc_address_lookup_tables::address_lookup_table_store::AddressLookupTableStore;
|
||||
use solana_lite_rpc_blockstore::history::History;
|
||||
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
|
||||
|
@ -133,6 +135,7 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
|
|||
enable_address_lookup_tables,
|
||||
address_lookup_tables_binary,
|
||||
account_filters,
|
||||
enable_accounts_on_demand_accounts_service,
|
||||
..
|
||||
} = args;
|
||||
|
||||
|
@ -153,15 +156,26 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
|
|||
vec![]
|
||||
};
|
||||
|
||||
if enable_accounts_on_demand_accounts_service {
|
||||
log::info!("Accounts on demand service is enabled");
|
||||
} else {
|
||||
log::info!("Accounts on demand service is disabled");
|
||||
}
|
||||
|
||||
let timeouts = GrpcConnectionTimeouts {
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
request_timeout: Duration::from_secs(5),
|
||||
subscribe_timeout: Duration::from_secs(5),
|
||||
receive_timeout: Duration::from_secs(5),
|
||||
};
|
||||
|
||||
let gprc_sources = grpc_sources
|
||||
.iter()
|
||||
.map(|s| GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone()))
|
||||
.collect_vec();
|
||||
|
||||
let (subscriptions, cluster_endpoint_tasks) = if use_grpc {
|
||||
info!("Creating geyser subscription...");
|
||||
|
||||
let timeouts = GrpcConnectionTimeouts {
|
||||
connect_timeout: Duration::from_secs(5),
|
||||
request_timeout: Duration::from_secs(5),
|
||||
subscribe_timeout: Duration::from_secs(5),
|
||||
receive_timeout: Duration::from_secs(5),
|
||||
};
|
||||
create_grpc_subscription(
|
||||
rpc_client.clone(),
|
||||
grpc_sources
|
||||
|
@ -196,7 +210,22 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:
|
|||
let inmemory_account_storage: Arc<dyn AccountStorageInterface> =
|
||||
Arc::new(InmemoryAccountStore::new());
|
||||
const MAX_CONNECTIONS_IN_PARALLEL: usize = 10;
|
||||
let account_service = AccountService::new(inmemory_account_storage);
|
||||
// Accounts notifications will be spurious when slots change
|
||||
// 256 seems very reasonable so that there are no account notification is missed and memory usage
|
||||
let (account_notification_sender, _) = tokio::sync::broadcast::channel(256);
|
||||
|
||||
let account_storage = if enable_accounts_on_demand_accounts_service {
|
||||
Arc::new(AccountsOnDemand::new(
|
||||
rpc_client.clone(),
|
||||
gprc_sources,
|
||||
inmemory_account_storage,
|
||||
account_notification_sender.clone(),
|
||||
))
|
||||
} else {
|
||||
inmemory_account_storage
|
||||
};
|
||||
|
||||
let account_service = AccountService::new(account_storage, account_notification_sender);
|
||||
|
||||
account_service
|
||||
.process_account_stream(account_stream.resubscribe(), blocks_notifier.resubscribe());
|
||||
|
|
|
@ -214,4 +214,11 @@ pub trait LiteRpc {
|
|||
program_id_str: String,
|
||||
config: Option<RpcProgramAccountsConfig>,
|
||||
) -> RpcResult<OptionalContext<Vec<RpcKeyedAccount>>>;
|
||||
|
||||
#[method(name = "getBalance")]
|
||||
async fn get_balance(
|
||||
&self,
|
||||
pubkey_str: String,
|
||||
config: Option<RpcContextConfig>,
|
||||
) -> RpcResult<RpcResponse<u64>>;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue