2024-02-07 06:51:22 -08:00
|
|
|
use crate::endpoint_stremers::EndpointStreaming;
|
2024-03-26 06:32:50 -07:00
|
|
|
use crate::grpc::grpc_accounts_streaming::create_grpc_account_streaming;
|
|
|
|
use crate::grpc::grpc_utils::connect_with_timeout_hacked;
|
2023-12-22 05:42:20 -08:00
|
|
|
use crate::grpc_multiplex::{
|
2024-02-07 06:51:22 -08:00
|
|
|
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription,
|
2023-08-31 03:34:13 -07:00
|
|
|
};
|
2024-03-25 09:44:57 -07:00
|
|
|
use anyhow::Context;
|
|
|
|
use futures::StreamExt;
|
2024-02-07 06:51:22 -08:00
|
|
|
use geyser_grpc_connector::GrpcSourceConfig;
|
2023-08-31 03:34:13 -07:00
|
|
|
use itertools::Itertools;
|
2024-03-01 08:21:48 -08:00
|
|
|
use log::trace;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_client::nonblocking::rpc_client::RpcClient;
|
2024-03-26 06:32:50 -07:00
|
|
|
use solana_lite_rpc_core::structures::account_data::AccountNotificationMessage;
|
2024-02-14 10:20:25 -08:00
|
|
|
use solana_lite_rpc_core::structures::account_filter::AccountFilters;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_lite_rpc_core::{
|
2023-12-22 05:42:20 -08:00
|
|
|
structures::produced_block::{ProducedBlock, TransactionInfo},
|
2023-08-31 03:34:13 -07:00
|
|
|
AnyhowJoinHandle,
|
|
|
|
};
|
2024-01-25 01:29:14 -08:00
|
|
|
use solana_sdk::program_utils::limited_deserialize;
|
|
|
|
use solana_sdk::vote::instruction::VoteInstruction;
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_sdk::{
|
2023-09-04 06:09:51 -07:00
|
|
|
borsh0_10::try_from_slice_unchecked,
|
2023-08-31 03:34:13 -07:00
|
|
|
commitment_config::CommitmentConfig,
|
|
|
|
compute_budget::{self, ComputeBudgetInstruction},
|
|
|
|
hash::Hash,
|
|
|
|
instruction::CompiledInstruction,
|
|
|
|
message::{
|
|
|
|
v0::{self, MessageAddressTableLookup},
|
|
|
|
MessageHeader, VersionedMessage,
|
|
|
|
},
|
|
|
|
pubkey::Pubkey,
|
|
|
|
signature::Signature,
|
|
|
|
transaction::TransactionError,
|
|
|
|
};
|
|
|
|
use solana_transaction_status::{Reward, RewardType};
|
2024-03-01 08:21:48 -08:00
|
|
|
use std::cell::OnceCell;
|
2024-03-25 09:44:57 -07:00
|
|
|
use std::collections::HashMap;
|
2024-02-15 12:49:15 -08:00
|
|
|
use std::sync::Arc;
|
2024-04-02 06:07:01 -07:00
|
|
|
use tokio::sync::{broadcast, Notify};
|
2024-03-01 08:21:48 -08:00
|
|
|
use tracing::trace_span;
|
2024-03-26 06:32:50 -07:00
|
|
|
use yellowstone_grpc_client::GeyserGrpcClient;
|
2024-03-25 09:44:57 -07:00
|
|
|
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
|
|
|
use yellowstone_grpc_proto::geyser::{
|
|
|
|
CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, SubscribeUpdateSlot,
|
|
|
|
};
|
2023-12-22 05:42:20 -08:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{
|
|
|
|
poll_cluster_info, poll_vote_accounts,
|
|
|
|
};
|
2024-03-01 08:21:48 -08:00
|
|
|
use solana_lite_rpc_core::solana_utils::hash_from_str;
|
2024-02-22 05:28:29 -08:00
|
|
|
use solana_lite_rpc_core::structures::produced_block::ProducedBlockInner;
|
2024-02-15 12:49:15 -08:00
|
|
|
use yellowstone_grpc_proto::prelude::SubscribeUpdateBlock;
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-01-25 01:29:14 -08:00
|
|
|
/// grpc version of ProducedBlock mapping
|
|
|
|
pub fn from_grpc_block_update(
|
2023-08-31 03:34:13 -07:00
|
|
|
block: SubscribeUpdateBlock,
|
|
|
|
commitment_config: CommitmentConfig,
|
2023-09-05 01:28:37 -07:00
|
|
|
) -> ProducedBlock {
|
2024-03-01 08:21:48 -08:00
|
|
|
let num_transactions = block.transactions.len();
|
|
|
|
let _span = trace_span!("from_grpc_block_update", ?block.slot, ?num_transactions).entered();
|
2023-08-31 03:34:13 -07:00
|
|
|
let txs: Vec<TransactionInfo> = block
|
|
|
|
.transactions
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(|tx| {
|
2024-01-12 07:39:19 -08:00
|
|
|
let meta = tx.meta?;
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-01-12 07:39:19 -08:00
|
|
|
let transaction = tx.transaction?;
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-01-12 07:39:19 -08:00
|
|
|
let message = transaction.message?;
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-01-12 07:39:19 -08:00
|
|
|
let header = message.header?;
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-03-01 08:21:48 -08:00
|
|
|
let signature = {
|
|
|
|
let sig_bytes: [u8; 64] = tx.signature.try_into().expect("must map to signature");
|
|
|
|
Signature::from(sig_bytes)
|
|
|
|
};
|
2023-08-31 03:34:13 -07:00
|
|
|
|
|
|
|
let err = meta.err.map(|x| {
|
|
|
|
bincode::deserialize::<TransactionError>(&x.err)
|
|
|
|
.expect("TransactionError should be deserialized")
|
|
|
|
});
|
|
|
|
|
|
|
|
let compute_units_consumed = meta.compute_units_consumed;
|
2024-02-01 09:49:51 -08:00
|
|
|
let account_keys: Vec<Pubkey> = message
|
|
|
|
.account_keys
|
|
|
|
.into_iter()
|
2024-03-01 08:21:48 -08:00
|
|
|
.map(|key_bytes| {
|
|
|
|
let slice: &[u8] = key_bytes.as_slice();
|
|
|
|
Pubkey::try_from(slice).expect("must map to pubkey")
|
2024-02-01 09:49:51 -08:00
|
|
|
})
|
|
|
|
.collect();
|
2023-08-31 03:34:13 -07:00
|
|
|
|
|
|
|
let message = VersionedMessage::V0(v0::Message {
|
|
|
|
header: MessageHeader {
|
|
|
|
num_required_signatures: header.num_required_signatures as u8,
|
|
|
|
num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8,
|
|
|
|
num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8,
|
|
|
|
},
|
2024-02-01 09:49:51 -08:00
|
|
|
account_keys: account_keys.clone(),
|
2023-08-31 03:34:13 -07:00
|
|
|
recent_blockhash: Hash::new(&message.recent_blockhash),
|
|
|
|
instructions: message
|
|
|
|
.instructions
|
|
|
|
.into_iter()
|
|
|
|
.map(|ix| CompiledInstruction {
|
|
|
|
program_id_index: ix.program_id_index as u8,
|
|
|
|
accounts: ix.accounts,
|
|
|
|
data: ix.data,
|
|
|
|
})
|
|
|
|
.collect(),
|
|
|
|
address_table_lookups: message
|
|
|
|
.address_table_lookups
|
|
|
|
.into_iter()
|
|
|
|
.map(|table| {
|
2024-03-01 08:21:48 -08:00
|
|
|
let slice: &[u8] = table.account_key.as_slice();
|
|
|
|
let account_key = Pubkey::try_from(slice).expect("must map to pubkey");
|
2023-08-31 03:34:13 -07:00
|
|
|
MessageAddressTableLookup {
|
2024-03-01 08:21:48 -08:00
|
|
|
account_key,
|
2023-08-31 03:34:13 -07:00
|
|
|
writable_indexes: table.writable_indexes,
|
|
|
|
readonly_indexes: table.readonly_indexes,
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect(),
|
|
|
|
});
|
|
|
|
|
2024-03-01 08:21:48 -08:00
|
|
|
let (cu_requested, prioritization_fees) = map_compute_budget_instructions(&message);
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-01-25 01:29:14 -08:00
|
|
|
let is_vote_transaction = message.instructions().iter().any(|i| {
|
|
|
|
i.program_id(message.static_account_keys())
|
|
|
|
.eq(&solana_sdk::vote::program::id())
|
|
|
|
&& limited_deserialize::<VoteInstruction>(&i.data)
|
|
|
|
.map(|vi| vi.is_simple_vote())
|
|
|
|
.unwrap_or(false)
|
|
|
|
});
|
|
|
|
|
2024-02-01 09:49:51 -08:00
|
|
|
let readable_accounts = account_keys
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
|
|
|
.filter(|(index, _)| !message.is_maybe_writable(*index))
|
|
|
|
.map(|(_, pk)| *pk)
|
|
|
|
.collect();
|
|
|
|
let writable_accounts = account_keys
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
|
|
|
.filter(|(index, _)| message.is_maybe_writable(*index))
|
|
|
|
.map(|(_, pk)| *pk)
|
|
|
|
.collect();
|
|
|
|
|
2024-02-06 08:59:46 -08:00
|
|
|
let address_lookup_tables = message
|
|
|
|
.address_table_lookups()
|
|
|
|
.map(|x| x.to_vec())
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
2023-08-31 03:34:13 -07:00
|
|
|
Some(TransactionInfo {
|
2024-03-01 08:21:48 -08:00
|
|
|
signature,
|
2024-01-25 01:29:14 -08:00
|
|
|
is_vote: is_vote_transaction,
|
2023-08-31 03:34:13 -07:00
|
|
|
err,
|
|
|
|
cu_requested,
|
|
|
|
prioritization_fees,
|
|
|
|
cu_consumed: compute_units_consumed,
|
2024-03-01 08:21:48 -08:00
|
|
|
recent_blockhash: *message.recent_blockhash(),
|
|
|
|
message,
|
2024-02-01 09:49:51 -08:00
|
|
|
readable_accounts,
|
|
|
|
writable_accounts,
|
2024-02-06 08:59:46 -08:00
|
|
|
address_lookup_tables,
|
2023-08-31 03:34:13 -07:00
|
|
|
})
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
let rewards = block.rewards.map(|rewards| {
|
|
|
|
rewards
|
|
|
|
.rewards
|
|
|
|
.into_iter()
|
|
|
|
.map(|reward| Reward {
|
|
|
|
pubkey: reward.pubkey.to_owned(),
|
|
|
|
lamports: reward.lamports,
|
|
|
|
post_balance: reward.post_balance,
|
|
|
|
reward_type: match reward.reward_type() {
|
|
|
|
yellowstone_grpc_proto::prelude::RewardType::Unspecified => None,
|
|
|
|
yellowstone_grpc_proto::prelude::RewardType::Fee => Some(RewardType::Fee),
|
|
|
|
yellowstone_grpc_proto::prelude::RewardType::Rent => Some(RewardType::Rent),
|
|
|
|
yellowstone_grpc_proto::prelude::RewardType::Staking => {
|
|
|
|
Some(RewardType::Staking)
|
|
|
|
}
|
|
|
|
yellowstone_grpc_proto::prelude::RewardType::Voting => Some(RewardType::Voting),
|
|
|
|
},
|
|
|
|
commission: None,
|
|
|
|
})
|
|
|
|
.collect_vec()
|
|
|
|
});
|
|
|
|
|
2023-10-04 06:25:06 -07:00
|
|
|
let leader_id = if let Some(rewards) = &rewards {
|
2023-08-31 03:34:13 -07:00
|
|
|
rewards
|
|
|
|
.iter()
|
|
|
|
.find(|reward| Some(RewardType::Fee) == reward.reward_type)
|
|
|
|
.map(|leader_reward| leader_reward.pubkey.clone())
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
2024-02-22 05:28:29 -08:00
|
|
|
let inner = ProducedBlockInner {
|
2023-09-21 06:44:58 -07:00
|
|
|
transactions: txs,
|
2023-08-31 03:34:13 -07:00
|
|
|
block_height: block
|
|
|
|
.block_height
|
|
|
|
.map(|block_height| block_height.block_height)
|
|
|
|
.unwrap(),
|
|
|
|
block_time: block.block_time.map(|time| time.timestamp).unwrap() as u64,
|
2024-03-01 08:21:48 -08:00
|
|
|
blockhash: hash_from_str(&block.blockhash).expect("valid blockhash"),
|
|
|
|
previous_blockhash: hash_from_str(&block.parent_blockhash).expect("valid blockhash"),
|
2023-08-31 03:34:13 -07:00
|
|
|
leader_id,
|
|
|
|
parent_slot: block.parent_slot,
|
|
|
|
slot: block.slot,
|
2023-10-04 06:25:06 -07:00
|
|
|
rewards,
|
2024-02-22 05:28:29 -08:00
|
|
|
};
|
|
|
|
ProducedBlock::new(inner, commitment_config)
|
2023-08-31 03:34:13 -07:00
|
|
|
}
|
|
|
|
|
2024-03-01 08:21:48 -08:00
|
|
|
fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option<u32>, Option<u64>) {
|
|
|
|
let cu_requested_cell: OnceCell<u32> = OnceCell::new();
|
|
|
|
let legacy_cu_requested_cell: OnceCell<u32> = OnceCell::new();
|
|
|
|
|
|
|
|
let prioritization_fees_cell: OnceCell<u64> = OnceCell::new();
|
|
|
|
let legacy_prio_fees_cell: OnceCell<u64> = OnceCell::new();
|
|
|
|
|
|
|
|
for compute_budget_ins in message.instructions().iter().filter(|instruction| {
|
|
|
|
instruction
|
|
|
|
.program_id(message.static_account_keys())
|
|
|
|
.eq(&compute_budget::id())
|
|
|
|
}) {
|
|
|
|
if let Ok(budget_ins) =
|
|
|
|
try_from_slice_unchecked::<ComputeBudgetInstruction>(compute_budget_ins.data.as_slice())
|
|
|
|
{
|
|
|
|
match budget_ins {
|
|
|
|
// aka cu requested
|
|
|
|
ComputeBudgetInstruction::SetComputeUnitLimit(limit) => {
|
|
|
|
cu_requested_cell
|
|
|
|
.set(limit)
|
|
|
|
.expect("cu_limit must be set only once");
|
|
|
|
}
|
|
|
|
// aka prio fees
|
|
|
|
ComputeBudgetInstruction::SetComputeUnitPrice(price) => {
|
|
|
|
prioritization_fees_cell
|
|
|
|
.set(price)
|
|
|
|
.expect("prioritization_fees must be set only once");
|
|
|
|
}
|
|
|
|
// legacy
|
|
|
|
ComputeBudgetInstruction::RequestUnitsDeprecated {
|
|
|
|
units,
|
|
|
|
additional_fee,
|
|
|
|
} => {
|
|
|
|
let _ = legacy_cu_requested_cell.set(units);
|
|
|
|
if additional_fee > 0 {
|
|
|
|
let _ = legacy_prio_fees_cell.set(((units * 1000) / additional_fee) as u64);
|
|
|
|
};
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
trace!("skip compute budget instruction");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
let cu_requested = cu_requested_cell
|
|
|
|
.get()
|
|
|
|
.or(legacy_cu_requested_cell.get())
|
|
|
|
.cloned();
|
|
|
|
let prioritization_fees = prioritization_fees_cell
|
|
|
|
.get()
|
|
|
|
.or(legacy_prio_fees_cell.get())
|
|
|
|
.cloned();
|
|
|
|
(cu_requested, prioritization_fees)
|
|
|
|
}
|
|
|
|
|
2024-03-25 09:44:57 -07:00
|
|
|
pub fn create_block_processing_task(
|
|
|
|
grpc_addr: String,
|
|
|
|
grpc_x_token: Option<String>,
|
|
|
|
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
|
|
|
|
commitment_level: CommitmentLevel,
|
2024-03-28 08:03:03 -07:00
|
|
|
mut exit_notify: broadcast::Receiver<()>,
|
2024-03-25 09:44:57 -07:00
|
|
|
) -> AnyhowJoinHandle {
|
|
|
|
tokio::spawn(async move {
|
2024-04-02 06:07:01 -07:00
|
|
|
'main_loop: loop {
|
2024-03-25 09:44:57 -07:00
|
|
|
let mut blocks_subs = HashMap::new();
|
|
|
|
blocks_subs.insert(
|
|
|
|
"block_client".to_string(),
|
|
|
|
SubscribeRequestFilterBlocks {
|
|
|
|
account_include: Default::default(),
|
|
|
|
include_transactions: Some(true),
|
|
|
|
include_accounts: Some(false),
|
|
|
|
include_entries: Some(false),
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
// connect to grpc
|
|
|
|
let mut client =
|
|
|
|
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
|
2024-04-02 06:07:01 -07:00
|
|
|
let mut stream = tokio::select! {
|
2024-03-28 08:03:03 -07:00
|
|
|
res = client
|
2024-03-25 09:44:57 -07:00
|
|
|
.subscribe_once(
|
|
|
|
HashMap::new(),
|
|
|
|
Default::default(),
|
|
|
|
HashMap::new(),
|
|
|
|
Default::default(),
|
|
|
|
blocks_subs,
|
|
|
|
Default::default(),
|
|
|
|
Some(commitment_level),
|
|
|
|
Default::default(),
|
|
|
|
None,
|
2024-03-28 08:03:03 -07:00
|
|
|
) => {
|
|
|
|
res?
|
|
|
|
},
|
|
|
|
_ = exit_notify.recv() => {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
};
|
2024-03-25 09:44:57 -07:00
|
|
|
|
2024-03-27 09:09:38 -07:00
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
message = stream.next() => {
|
|
|
|
let Some(Ok(message)) = message else {
|
|
|
|
break;
|
|
|
|
};
|
2024-03-25 09:44:57 -07:00
|
|
|
|
2024-03-27 09:09:38 -07:00
|
|
|
let Some(update) = message.update_oneof else {
|
|
|
|
continue;
|
|
|
|
};
|
2024-03-25 09:44:57 -07:00
|
|
|
|
2024-03-27 09:09:38 -07:00
|
|
|
match update {
|
|
|
|
UpdateOneof::Block(block) => {
|
|
|
|
log::trace!(
|
|
|
|
"received block, hash: {} slot: {}",
|
|
|
|
block.blockhash,
|
|
|
|
block.slot
|
|
|
|
);
|
|
|
|
block_sx
|
|
|
|
.send(block)
|
|
|
|
.await
|
|
|
|
.context("Problem sending on block channel")?;
|
|
|
|
}
|
|
|
|
UpdateOneof::Ping(_) => {
|
|
|
|
log::trace!("GRPC Ping");
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
log::trace!("unknown GRPC notification");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
},
|
2024-03-28 08:03:03 -07:00
|
|
|
_ = exit_notify.recv() => {
|
|
|
|
break 'main_loop;
|
2024-03-25 09:44:57 -07:00
|
|
|
}
|
2024-03-27 09:09:38 -07:00
|
|
|
}
|
2024-03-25 09:44:57 -07:00
|
|
|
}
|
2024-03-27 09:09:38 -07:00
|
|
|
drop(stream);
|
|
|
|
drop(client);
|
2024-03-25 09:44:57 -07:00
|
|
|
log::error!("Grpc block subscription broken (resubscribing)");
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
|
|
}
|
2024-04-02 06:07:01 -07:00
|
|
|
Ok(())
|
2024-03-25 09:44:57 -07:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
// not used
|
|
|
|
pub fn create_slot_stream_task(
|
|
|
|
grpc_addr: String,
|
|
|
|
grpc_x_token: Option<String>,
|
|
|
|
slot_sx: async_channel::Sender<SubscribeUpdateSlot>,
|
|
|
|
commitment_level: CommitmentLevel,
|
|
|
|
) -> AnyhowJoinHandle {
|
|
|
|
tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let mut slots = HashMap::new();
|
|
|
|
slots.insert(
|
|
|
|
"client_slot".to_string(),
|
|
|
|
SubscribeRequestFilterSlots {
|
|
|
|
filter_by_commitment: Some(true),
|
|
|
|
},
|
|
|
|
);
|
|
|
|
|
|
|
|
// connect to grpc
|
|
|
|
let mut client =
|
|
|
|
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
|
|
|
|
let mut stream = client
|
|
|
|
.subscribe_once(
|
|
|
|
slots,
|
|
|
|
Default::default(),
|
|
|
|
HashMap::new(),
|
|
|
|
Default::default(),
|
|
|
|
HashMap::new(),
|
|
|
|
Default::default(),
|
|
|
|
Some(commitment_level),
|
|
|
|
Default::default(),
|
|
|
|
None,
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
while let Some(message) = stream.next().await {
|
|
|
|
let message = message?;
|
|
|
|
|
|
|
|
let Some(update) = message.update_oneof else {
|
|
|
|
continue;
|
|
|
|
};
|
|
|
|
|
|
|
|
match update {
|
|
|
|
UpdateOneof::Slot(slot) => {
|
|
|
|
slot_sx
|
|
|
|
.send(slot)
|
|
|
|
.await
|
|
|
|
.context("Problem sending on block channel")?;
|
|
|
|
}
|
|
|
|
UpdateOneof::Ping(_) => {
|
|
|
|
log::trace!("GRPC Ping");
|
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
log::trace!("unknown GRPC notification");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
log::error!("Grpc block subscription broken (resubscribing)");
|
|
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-12-01 02:12:54 -08:00
|
|
|
pub fn create_grpc_subscription(
|
|
|
|
rpc_client: Arc<RpcClient>,
|
2023-12-22 05:42:20 -08:00
|
|
|
grpc_sources: Vec<GrpcSourceConfig>,
|
2024-02-14 10:20:25 -08:00
|
|
|
accounts_filter: AccountFilters,
|
2023-12-01 02:12:54 -08:00
|
|
|
) -> anyhow::Result<(EndpointStreaming, Vec<AnyhowJoinHandle>)> {
|
|
|
|
let (cluster_info_sx, cluster_info_notifier) = tokio::sync::broadcast::channel(10);
|
|
|
|
let (va_sx, vote_account_notifier) = tokio::sync::broadcast::channel(10);
|
|
|
|
|
2023-12-22 05:42:20 -08:00
|
|
|
// processed slot is required to keep up with leader schedule
|
|
|
|
let (slot_multiplex_channel, jh_multiplex_slotstream) =
|
2024-02-07 06:51:22 -08:00
|
|
|
create_grpc_multiplex_processed_slots_subscription(grpc_sources.clone());
|
2023-12-01 02:12:54 -08:00
|
|
|
|
2024-03-22 11:22:38 -07:00
|
|
|
let (block_multiplex_channel, blockmeta_channel, jh_multiplex_blockstream) =
|
2024-02-14 10:20:25 -08:00
|
|
|
create_grpc_multiplex_blocks_subscription(grpc_sources.clone());
|
2023-08-31 03:34:13 -07:00
|
|
|
|
2024-02-07 06:51:22 -08:00
|
|
|
let cluster_info_polling = poll_cluster_info(rpc_client.clone(), cluster_info_sx);
|
|
|
|
let vote_accounts_polling = poll_vote_accounts(rpc_client.clone(), va_sx);
|
2024-02-14 10:20:25 -08:00
|
|
|
// accounts
|
|
|
|
if !accounts_filter.is_empty() {
|
2024-03-26 06:32:50 -07:00
|
|
|
let (account_sender, accounts_stream) =
|
|
|
|
tokio::sync::broadcast::channel::<AccountNotificationMessage>(1024);
|
|
|
|
let account_jh = create_grpc_account_streaming(
|
|
|
|
grpc_sources,
|
|
|
|
accounts_filter,
|
|
|
|
account_sender,
|
|
|
|
Arc::new(Notify::new()),
|
|
|
|
);
|
2024-02-14 10:20:25 -08:00
|
|
|
let streamers = EndpointStreaming {
|
|
|
|
blocks_notifier: block_multiplex_channel,
|
2024-03-22 11:22:38 -07:00
|
|
|
blockinfo_notifier: blockmeta_channel,
|
2024-02-14 10:20:25 -08:00
|
|
|
slot_notifier: slot_multiplex_channel,
|
|
|
|
cluster_info_notifier,
|
|
|
|
vote_account_notifier,
|
2024-03-26 06:32:50 -07:00
|
|
|
processed_account_stream: Some(accounts_stream),
|
2024-02-14 10:20:25 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
let endpoint_tasks = vec![
|
|
|
|
jh_multiplex_slotstream,
|
|
|
|
jh_multiplex_blockstream,
|
|
|
|
cluster_info_polling,
|
|
|
|
vote_accounts_polling,
|
|
|
|
account_jh,
|
|
|
|
];
|
|
|
|
Ok((streamers, endpoint_tasks))
|
|
|
|
} else {
|
|
|
|
let streamers = EndpointStreaming {
|
|
|
|
blocks_notifier: block_multiplex_channel,
|
2024-03-22 11:22:38 -07:00
|
|
|
blockinfo_notifier: blockmeta_channel,
|
2024-02-14 10:20:25 -08:00
|
|
|
slot_notifier: slot_multiplex_channel,
|
|
|
|
cluster_info_notifier,
|
|
|
|
vote_account_notifier,
|
|
|
|
processed_account_stream: None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let endpoint_tasks = vec![
|
|
|
|
jh_multiplex_slotstream,
|
|
|
|
jh_multiplex_blockstream,
|
|
|
|
cluster_info_polling,
|
|
|
|
vote_accounts_polling,
|
|
|
|
];
|
|
|
|
Ok((streamers, endpoint_tasks))
|
|
|
|
}
|
2023-08-31 03:34:13 -07:00
|
|
|
}
|