Adding quic geyser source and making it work with all raydium pools (#5)

Adding quic geyser source and making it work with all raydium pools
This commit is contained in:
galactus 2024-10-01 10:15:23 +02:00 committed by GitHub
parent 31b4f35d61
commit 520d6a74b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 773 additions and 68 deletions

68
Cargo.lock generated
View File

@ -740,6 +740,8 @@ dependencies = [
"ordered-float",
"priority-queue",
"prometheus",
"quic-geyser-client",
"quic-geyser-common",
"rand 0.7.3",
"regex",
"router-config-lib",
@ -3264,7 +3266,7 @@ dependencies = [
"futures-util",
"http 0.2.12",
"hyper 0.14.28",
"rustls 0.21.10",
"rustls 0.21.12",
"tokio",
"tokio-rustls 0.24.1",
]
@ -5303,6 +5305,40 @@ dependencies = [
"syn 2.0.58",
]
[[package]]
name = "quic-geyser-client"
version = "0.1.5"
source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48"
dependencies = [
"anyhow",
"bincode",
"log 0.4.21",
"pkcs8",
"quic-geyser-common",
"quinn",
"rcgen",
"rustls 0.21.12",
"solana-sdk",
"tokio",
]
[[package]]
name = "quic-geyser-common"
version = "0.1.5"
source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48"
dependencies = [
"anyhow",
"bincode",
"itertools 0.10.5",
"log 0.4.21",
"lz4",
"serde",
"solana-program",
"solana-sdk",
"solana-transaction-status",
"thiserror",
]
[[package]]
name = "quick-protobuf"
version = "0.8.0"
@ -5323,7 +5359,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls 0.21.10",
"rustls 0.21.12",
"thiserror",
"tokio",
"tracing",
@ -5339,7 +5375,7 @@ dependencies = [
"rand 0.8.5",
"ring 0.16.20",
"rustc-hash",
"rustls 0.21.10",
"rustls 0.21.12",
"rustls-native-certs",
"slab",
"thiserror",
@ -5762,7 +5798,7 @@ dependencies = [
"once_cell",
"percent-encoding 2.3.1",
"pin-project-lite",
"rustls 0.21.10",
"rustls 0.21.12",
"rustls-pemfile",
"serde",
"serde_json",
@ -6049,9 +6085,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.10"
version = "0.21.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
dependencies = [
"log 0.4.21",
"ring 0.17.8",
@ -6117,8 +6153,8 @@ dependencies = [
"sanctum-token-ratio",
"solana-program",
"solana-readonly-account",
"spl-associated-token-account 2.3.0",
"spl-token 4.0.0",
"spl-associated-token-account 1.1.3",
"spl-token 3.5.0",
"spl-token-metadata-interface",
"static_assertions",
]
@ -6231,7 +6267,7 @@ source = "git+https://github.com/igneous-labs/sanctum-solana-utils.git?rev=2d171
dependencies = [
"solana-program",
"solana-readonly-account",
"spl-associated-token-account 2.3.0",
"spl-associated-token-account 1.1.3",
]
[[package]]
@ -6284,7 +6320,7 @@ source = "git+https://github.com/igneous-labs/sanctum-solana-utils.git?rev=2d171
dependencies = [
"solana-program",
"solana-readonly-account",
"spl-token-2022 1.0.0",
"spl-token-2022 0.6.1",
]
[[package]]
@ -7408,7 +7444,7 @@ dependencies = [
"quinn",
"quinn-proto",
"rcgen",
"rustls 0.21.10",
"rustls 0.21.12",
"solana-connection-cache",
"solana-measure",
"solana-metrics",
@ -7725,7 +7761,7 @@ dependencies = [
"quinn-proto",
"rand 0.8.5",
"rcgen",
"rustls 0.21.10",
"rustls 0.21.12",
"solana-metrics",
"solana-perf",
"solana-sdk",
@ -9084,7 +9120,7 @@ version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [
"rustls 0.21.10",
"rustls 0.21.12",
"tokio",
]
@ -9158,7 +9194,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c"
dependencies = [
"futures-util",
"log 0.4.21",
"rustls 0.21.10",
"rustls 0.21.12",
"tokio",
"tokio-rustls 0.24.1",
"tungstenite 0.20.1",
@ -9264,7 +9300,7 @@ dependencies = [
"percent-encoding 2.3.1",
"pin-project",
"prost 0.12.4",
"rustls 0.21.10",
"rustls 0.21.12",
"rustls-native-certs",
"rustls-pemfile",
"tokio",
@ -9480,7 +9516,7 @@ dependencies = [
"httparse",
"log 0.4.21",
"rand 0.8.5",
"rustls 0.21.10",
"rustls 0.21.12",
"sha1",
"thiserror",
"url 2.5.0",

View File

@ -17,6 +17,7 @@ solana-rpc-client-api = { version = "1.17" }
mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", tag = "connector-v0.4.8" }
yellowstone-grpc-client = { version = "1.15.0", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.15.0+solana.1.17" }
yellowstone-grpc-proto = { version = "1.14.0", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.15.0+solana.1.17" }
reqwest = { version = "0.11.27", features = ["json"] }
whirlpools-client = { git = "https://github.com/blockworks-foundation/whirlpools-client/", features = ["no-entrypoint"] }
openbook-v2 = { git = "https://github.com/openbook-dex/openbook-v2", tag = "v0.2.7", features = ["no-entrypoint", "client"] }
@ -25,6 +26,8 @@ stable-swap = { version = "1.8.1", features = ["no-entrypoint", "client"] }
stable-swap-client = { version = "1.8.1" }
stable-swap-math = { version = "1.8.1" }
uint = { version = "0.9.1" }
quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_v1.17.29" }
quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_v1.17.29" }
[profile.release]
overflow-checks = true

View File

@ -87,6 +87,10 @@ yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
tonic = { version = "0.10.2", features = ["gzip"] }
# quic
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }
# compressed snapshots
lz4 = "1.24.0"

View File

@ -22,13 +22,13 @@ pub async fn main() {
let rpc_http_addr = env::var("RPC_HTTP_ADDR").expect("need rpc http url");
let snapshot_config = AccountDataSourceConfig {
region: None,
use_quic: None,
quic_address: None,
quic_sources: None,
rpc_http_url: rpc_http_addr.clone(),
rpc_support_compression: Some(false), /* no compression */
re_snapshot_interval_secs: None,
grpc_sources: vec![],
grpc_sources: Some(vec![]),
dedup_queue_size: 0,
request_timeout_in_seconds: None,
};
// Raydium

View File

@ -138,7 +138,8 @@ pub fn spawn_updater_job(
),
};
let snapshot_timeout = Instant::now() + Duration::from_secs(60 * 5);
let snapshot_timeout_in_seconds = config.snapshot_timeout_in_seconds.unwrap_or(60 * 5);
let snapshot_timeout = Instant::now() + Duration::from_secs(snapshot_timeout_in_seconds);
let listener_job = tokio_spawn(format!("edge_updater_{}", dex.name).as_str(), async move {
let mut updater = EdgeUpdater {
dex,

View File

@ -175,19 +175,28 @@ async fn main() -> anyhow::Result<()> {
}
});
if source_config.grpc_sources.len() > 1 {
error!("only one grpc source is supported ATM");
exit(-1);
if let Some(quic_sources) = &source_config.quic_sources {
info!(
"quic sources: {}",
quic_sources
.iter()
.map(|c| c.connection_string.clone())
.collect::<String>()
);
}
info!(
"grpc sources: {}",
source_config
.grpc_sources
.iter()
.map(|c| c.connection_string.clone())
.collect::<String>()
);
if let Some(grpc_sources) = source_config.grpc_sources.clone() {
info!(
"grpc sources: {}",
grpc_sources
.iter()
.map(|c| c.connection_string.clone())
.collect::<String>()
);
} else {
// current grpc source is needed for transaction watcher even if there is quic
error!("No grpc geyser sources specified");
exit(-1);
};
if config.metrics.output_http {
let prom_bind_addr = config
@ -443,8 +452,8 @@ async fn main() -> anyhow::Result<()> {
let ef = exit_sender.subscribe();
let sc = source_config.clone();
let account_update_job = tokio_spawn("geyser", async move {
if sc.use_quic.unwrap_or(false) {
error!("not supported yet");
if sc.grpc_sources.is_none() && sc.quic_sources.is_none() {
error!("No quic or grpc plugin setup");
} else {
geyser::spawn_geyser_source(
&sc,
@ -528,7 +537,7 @@ fn build_price_feed(
fn build_rpc(source_config: &AccountDataSourceConfig) -> RpcClient {
RpcClient::new_with_timeouts_and_commitment(
string_or_env(source_config.rpc_http_url.clone()),
Duration::from_secs(60), // request timeout
Duration::from_secs(source_config.request_timeout_in_seconds.unwrap_or(60)), // request timeout
CommitmentConfig::confirmed(),
Duration::from_secs(60), // confirmation timeout
)
@ -537,7 +546,7 @@ fn build_rpc(source_config: &AccountDataSourceConfig) -> RpcClient {
fn build_blocking_rpc(source_config: &AccountDataSourceConfig) -> BlockingRpcClient {
BlockingRpcClient::new_with_timeouts_and_commitment(
string_or_env(source_config.rpc_http_url.clone()),
Duration::from_secs(60), // request timeout
Duration::from_secs(source_config.request_timeout_in_seconds.unwrap_or(60)), // request timeout
CommitmentConfig::confirmed(),
Duration::from_secs(60), // confirmation timeout
)

View File

@ -27,6 +27,23 @@ lazy_static::lazy_static! {
pub static ref GRPC_TO_EDGE_SLOT_LAG: IntGaugeVec =
register_int_gauge_vec!(opts!("router_grpc_to_edge_slot_lag", "RPC Slot vs last slot used to update edges"), &["dex_name"]).unwrap();
pub static ref QUIC_ACCOUNT_WRITES: IntCounter =
register_int_counter!("quic_account_writes", "Number of account updates via Geyser gRPC").unwrap();
pub static ref QUIC_ACCOUNT_WRITE_QUEUE: IntGauge =
register_int_gauge!("quic_account_write_queue", "Items in account write queue via Geyser gPRC").unwrap();
pub static ref QUIC_DEDUP_QUEUE: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!("quic_dedup_queue", "Items in dedup queue via Geyser gPRC").unwrap();
pub static ref QUIC_SLOT_UPDATE_QUEUE: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!("quic_slot_update_queue", "Items in slot update queue via Geyser gPRC").unwrap();
pub static ref QUIC_SLOT_UPDATES: IntCounter =
register_int_counter!("quic_slot_updates", "Number of slot updates via Geyser gPRC").unwrap();
pub static ref QUIC_SNAPSHOT_ACCOUNT_WRITES: IntCounter =
register_int_counter!("quic_snapshot_account_writes", "Number of account writes from snapshot").unwrap();
pub static ref QUIC_SOURCE_CONNECTION_RETRIES: IntCounterVec =
register_int_counter_vec!(opts!("quic_source_connection_retries", "gRPC source connection retries"), &["source_name"]).unwrap();
pub static ref QUIC_NO_MESSAGE_FOR_DURATION_MS: IntGauge =
register_int_gauge!("quic_no_update_for_duration_ms", "Did not get any message from Geyser gPRC for this duration").unwrap();
pub static ref HTTP_REQUEST_TIMING: HistogramVec =
register_histogram_vec!(
histogram_opts!("router_http_request_timing", "Endpoint timing in seconds",

View File

@ -9,6 +9,8 @@ use router_feed_lib::get_program_account::FeedMetadata;
use crate::source::grpc_plugin_source;
use super::quic_plugin_source;
pub async fn spawn_geyser_source(
config: &AccountDataSourceConfig,
exit_receiver: tokio::sync::broadcast::Receiver<()>,
@ -20,16 +22,31 @@ pub async fn spawn_geyser_source(
subscribed_token_accounts: &HashSet<Pubkey>,
filters: &HashSet<Pubkey>,
) {
grpc_plugin_source::process_events(
config.clone(),
subscribed_accounts.clone(),
subscribed_programs.clone(),
subscribed_token_accounts.clone(),
filters.clone(),
account_write_sender,
Some(metadata_write_sender),
slot_sender,
exit_receiver,
)
.await;
if config.quic_sources.is_some() {
quic_plugin_source::process_events(
config.clone(),
subscribed_accounts.clone(),
subscribed_programs.clone(),
subscribed_token_accounts.clone(),
filters.clone(),
account_write_sender,
Some(metadata_write_sender),
slot_sender,
exit_receiver,
)
.await;
} else if config.grpc_sources.is_some() {
grpc_plugin_source::process_events(
config.clone(),
subscribed_accounts.clone(),
subscribed_programs.clone(),
subscribed_token_accounts.clone(),
filters.clone(),
account_write_sender,
Some(metadata_write_sender),
slot_sender,
exit_receiver,
)
.await;
}
}

View File

@ -515,13 +515,13 @@ pub async fn process_events(
async_channel::bounded::<SourceMessage>(config.dedup_queue_size);
let mut source_jobs = vec![];
let Some(grpc_sources) = config.grpc_sources.clone() else {
return;
};
// note: caller in main.rs ensures this
assert_eq!(
config.grpc_sources.len(),
1,
"only one grpc source supported"
);
for grpc_source in config.grpc_sources.clone() {
assert_eq!(grpc_sources.len(), 1, "only one grpc source supported");
for grpc_source in grpc_sources.clone() {
let msg_sender = msg_sender.clone();
let sub_accounts = subscription_accounts.clone();
let sub_programs = subscription_programs.clone();

View File

@ -88,7 +88,6 @@ pub async fn request_mint_metadata(
count.fetch_add(1, Ordering::Relaxed);
}
}
mint_accounts
});
threads.push(jh_thread);

View File

@ -1,3 +1,4 @@
pub mod geyser;
pub mod grpc_plugin_source;
pub mod mint_accounts_source;
pub mod quic_plugin_source;

View File

@ -0,0 +1,605 @@
use itertools::Itertools;
use jsonrpc_core::futures::StreamExt;
use quic_geyser_common::filters::MemcmpFilter;
use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use solana_sdk::pubkey::Pubkey;
use anchor_spl::token::spl_token;
use async_channel::{Receiver, Sender};
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use std::{collections::HashMap, env, time::Duration};
use tracing::*;
use crate::metrics;
use mango_feeds_connector::{chain_data::SlotStatus, SlotUpdate};
use quic_geyser_common::message::Message;
use router_config_lib::{AccountDataSourceConfig, QuicSourceConfig};
use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite};
use router_feed_lib::get_program_account::{
get_snapshot_gma, get_snapshot_gpa, get_snapshot_gta, CustomSnapshotProgramAccounts,
FeedMetadata,
};
use solana_program::clock::Slot;
use tokio::sync::Semaphore;
const MAX_GMA_ACCOUNTS: usize = 100;
// limit number of concurrent gMA/gPA requests
const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4;
#[allow(clippy::large_enum_variant)]
pub enum SourceMessage {
QuicMessage(Message),
Snapshot(CustomSnapshotProgramAccounts),
}
pub async fn feed_data_geyser(
quic_source_config: &QuicSourceConfig,
snapshot_config: AccountDataSourceConfig,
subscribed_accounts: &HashSet<Pubkey>,
subscribed_programs: &HashSet<Pubkey>,
subscribed_token_accounts: &HashSet<Pubkey>,
sender: async_channel::Sender<SourceMessage>,
) -> anyhow::Result<()> {
let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false);
let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() {
'$' => env::var(&snapshot_config.rpc_http_url[1..])
.expect("reading connection string from env"),
_ => snapshot_config.rpc_http_url.clone(),
};
info!("connecting to quic source {:?}", quic_source_config);
let (quic_client, mut stream, _jh) = quic_geyser_client::non_blocking::client::Client::new(
quic_source_config.connection_string.clone(),
ConnectionParameters {
enable_gso: quic_source_config.enable_gso.unwrap_or(true),
..Default::default()
},
)
.await?;
let mut subscriptions = vec![];
let subscribed_program_filter = subscribed_programs.iter().map(|x| {
quic_geyser_common::filters::Filter::Account(quic_geyser_common::filters::AccountFilter {
owner: Some(*x),
accounts: None,
filters: None,
})
});
subscriptions.extend(subscribed_program_filter);
let subscribed_token_accounts_filter = subscribed_programs.iter().map(|x| {
quic_geyser_common::filters::Filter::Account(quic_geyser_common::filters::AccountFilter {
owner: Some(Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap()),
accounts: None,
filters: Some(vec![
quic_geyser_common::filters::AccountFilterType::Datasize(165),
quic_geyser_common::filters::AccountFilterType::Memcmp(MemcmpFilter {
offset: 32,
data: quic_geyser_common::filters::MemcmpFilterData::Bytes(
x.to_bytes().to_vec(),
),
}),
]),
})
});
subscriptions.extend(subscribed_token_accounts_filter);
subscriptions.push(quic_geyser_common::filters::Filter::Account(
quic_geyser_common::filters::AccountFilter {
accounts: Some(subscribed_accounts.clone()),
owner: None,
filters: None,
},
));
subscriptions.push(quic_geyser_common::filters::Filter::Slot);
quic_client.subscribe(subscriptions).await?;
// We can't get a snapshot immediately since the finalized snapshot would be for a
// slot in the past and we'd be missing intermediate updates.
//
// Delay the request until the first slot we received all writes for becomes rooted
// to avoid that problem - partially. The rooted slot will still be larger than the
// finalized slot, so add a number of slots as a buffer.
//
// If that buffer isn't sufficient, there'll be a retry.
// The first slot that we will receive _all_ account writes for
let mut first_full_slot: u64 = u64::MAX;
// If a snapshot should be performed when ready.
let mut snapshot_needed = true;
// The highest "rooted" slot that has been seen.
let mut max_finalized_slot = 0;
// Data for slots will arrive out of order. This value defines how many
// slots after a slot was marked "rooted" we assume it'll not receive
// any more account write information.
//
// This is important for the write_version mapping (to know when slots can
// be dropped).
let max_out_of_order_slots = 40;
// Number of slots that we expect "finalized" commitment to lag
// behind "rooted". This matters for getProgramAccounts based snapshots,
// which will have "finalized" commitment.
let mut rooted_to_finalized_slots = 30;
let (snapshot_gma_sender, mut snapshot_gma_receiver) = tokio::sync::mpsc::unbounded_channel();
// TODO log buffer size
// The plugin sends a ping every 5s or so
let fatal_idle_timeout = Duration::from_secs(15);
let mut re_snapshot_interval = tokio::time::interval(Duration::from_secs(
snapshot_config
.re_snapshot_interval_secs
.unwrap_or(60 * 60 * 12),
));
re_snapshot_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
re_snapshot_interval.tick().await;
// Highest slot that an account write came in for.
let mut newest_write_slot: u64 = 0;
let mut last_message_received_at = Instant::now();
loop {
tokio::select! {
update = stream.recv() => {
let Some(mut message) = update
else {
anyhow::bail!("geyser plugin has closed the stream");
};
// use account and slot updates to trigger snapshot loading
match &mut message {
Message::SlotMsg(slot_update) => {
trace!("received slot update for slot {}", slot_update.slot);
let commitment_config = slot_update.commitment_config;
debug!(
"slot_update: {} ({:?})",
slot_update.slot,
commitment_config
);
if commitment_config.is_finalized() {
if first_full_slot == u64::MAX {
// TODO: is this equivalent to before? what was highesy_write_slot?
first_full_slot = slot_update.slot + 1;
}
// TODO rename rooted to finalized
if slot_update.slot > max_finalized_slot {
max_finalized_slot = slot_update.slot;
}
let waiting_for_snapshot_slot = max_finalized_slot <= first_full_slot + rooted_to_finalized_slots;
if waiting_for_snapshot_slot {
debug!("waiting for snapshot slot: rooted={}, first_full={}, slot={}", max_finalized_slot, first_full_slot, slot_update.slot);
}
if snapshot_needed && !waiting_for_snapshot_slot {
snapshot_needed = false;
debug!("snapshot slot reached - setting up snapshot tasks");
let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS));
info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len());
for pubkey_chunk in subscribed_accounts.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() {
let rpc_http_url = snapshot_rpc_http_url.clone();
let account_ids = pubkey_chunk.map(|x| *x).collect_vec();
let sender = snapshot_gma_sender.clone();
let permits = permits_parallel_rpc_requests.clone();
tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
let snapshot = get_snapshot_gma(&rpc_http_url, &account_ids).await;
match sender.send(snapshot) {
Ok(_) => {}
Err(_) => {
warn!("Could not send snapshot, quic has probably reconnected");
}
}
});
}
info!("Requesting snapshot from gPA for {} program filter accounts", subscribed_programs.len());
for program_id in subscribed_programs {
let rpc_http_url = snapshot_rpc_http_url.clone();
let program_id = *program_id;
let sender = snapshot_gma_sender.clone();
let permits = permits_parallel_rpc_requests.clone();
tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
let snapshot = get_snapshot_gpa(&rpc_http_url, &program_id, use_compression).await;
match sender.send(snapshot) {
Ok(_) => {}
Err(_) => {
warn!("Could not send snapshot, quic has probably reconnected");
}
}
});
}
info!("Requesting snapshot from gTA for {} owners filter accounts", subscribed_token_accounts.len());
for owner_id in subscribed_token_accounts {
let rpc_http_url = snapshot_rpc_http_url.clone();
let owner_id = owner_id.clone();
let sender = snapshot_gma_sender.clone();
let permits = permits_parallel_rpc_requests.clone();
tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
let snapshot = get_snapshot_gta(&rpc_http_url, &owner_id).await;
match sender.send(snapshot) {
Ok(_) => {}
Err(_) => {
warn!("Could not send snapshot, quic has probably reconnected");
}
}
});
}
}
}
},
Message::AccountMsg(info) => {
let slot = info.slot_identifier.slot;
trace!("received account update for slot {}", slot);
if slot < first_full_slot {
// Don't try to process data for slots where we may have missed writes:
// We could not map the write_version correctly for them.
continue;
}
if slot > newest_write_slot {
newest_write_slot = slot;
debug!(
"newest_write_slot: {}",
newest_write_slot
);
} else if max_finalized_slot > 0 && info.slot_identifier.slot < max_finalized_slot - max_out_of_order_slots {
anyhow::bail!("received write {} slots back from max rooted slot {}", max_finalized_slot - slot, max_finalized_slot);
}
},
_ => {
// ignore all other quic update types
}
}
let elapsed = last_message_received_at.elapsed().as_millis();
metrics::QUIC_NO_MESSAGE_FOR_DURATION_MS.set(elapsed as i64);
last_message_received_at = Instant::now();
// send the incremental updates to the channel
sender.send(SourceMessage::QuicMessage(message)).await.expect("send success");
},
snapshot_message = snapshot_gma_receiver.recv() => {
let Some(snapshot_result) = snapshot_message
else {
anyhow::bail!("snapshot channel closed");
};
let snapshot = snapshot_result?;
debug!("snapshot (program={}, m_accounts={}) is for slot {}, first full slot was {}",
snapshot.program_id.map(|x| x.to_string()).unwrap_or("none".to_string()),
snapshot.accounts.len(),
snapshot.slot,
first_full_slot);
if snapshot.slot < first_full_slot {
warn!(
"snapshot is too old: has slot {}, expected {} minimum - request another one but also use this snapshot",
snapshot.slot,
first_full_slot
);
// try again in another 25 slots
snapshot_needed = true;
rooted_to_finalized_slots += 25;
}
// New - Don't care if the snapshot is old, we want startup to work anyway
// If an edge is not working properly, it will be disabled when swapping it
sender
.send(SourceMessage::Snapshot(snapshot))
.await
.expect("send success");
},
_ = tokio::time::sleep(fatal_idle_timeout) => {
anyhow::bail!("geyser plugin hasn't sent a message in too long");
}
_ = re_snapshot_interval.tick() => {
info!("Re-snapshot hack");
snapshot_needed = true;
}
}
}
}
pub async fn process_events(
config: AccountDataSourceConfig,
subscription_accounts: HashSet<Pubkey>,
subscription_programs: HashSet<Pubkey>,
subscription_token_accounts: HashSet<Pubkey>,
filters: HashSet<Pubkey>,
account_write_queue_sender: async_channel::Sender<AccountOrSnapshotUpdate>,
metdata_write_queue_sender: Option<async_channel::Sender<FeedMetadata>>,
slot_queue_sender: async_channel::Sender<SlotUpdate>,
mut exit: tokio::sync::broadcast::Receiver<()>,
) {
// Subscribe to geyser
let (msg_sender, msg_receiver) =
async_channel::bounded::<SourceMessage>(config.dedup_queue_size);
let mut source_jobs = vec![];
let Some(quic_sources) = config.quic_sources.clone() else {
return;
};
// note: caller in main.rs ensures this
assert_eq!(quic_sources.len(), 1, "only one quic source supported");
for quic_source in quic_sources.clone() {
let msg_sender = msg_sender.clone();
let sub_accounts = subscription_accounts.clone();
let sub_programs = subscription_programs.clone();
let sub_token_accounts = subscription_token_accounts.clone();
let cfg = config.clone();
source_jobs.push(tokio::spawn(async move {
let mut error_count = 0;
let mut last_error = Instant::now();
// Continuously reconnect on failure
loop {
let out = feed_data_geyser(
&quic_source,
cfg.clone(),
&sub_accounts,
&sub_programs,
&sub_token_accounts,
msg_sender.clone(),
);
if last_error.elapsed() > Duration::from_secs(60 * 10) {
error_count = 0;
}
else if error_count > 10 {
error!("error during communication with the geyser plugin - retried too many time, exiting..");
break;
}
match out.await {
// happy case!
Err(err) => {
warn!(
"error during communication with the geyser plugin - retrying: {:?}",
err
);
last_error = Instant::now();
error_count += 1;
}
// this should never happen
Ok(_) => {
error!("feed_data must return an error, not OK - continue");
last_error = Instant::now();
error_count += 1;
}
}
metrics::QUIC_SOURCE_CONNECTION_RETRIES
.with_label_values(&[&quic_source.name])
.inc();
tokio::time::sleep(std::time::Duration::from_secs(
quic_source.retry_connection_sleep_secs,
))
.await;
}
}));
}
// slot -> (pubkey -> write_version)
//
// To avoid unnecessarily sending requests to SQL, we track the latest write_version
// for each (slot, pubkey). If an already-seen write_version comes in, it can be safely
// discarded.
let mut latest_write = HashMap::<Slot, HashMap<Pubkey, u64>>::new();
// Number of slots to retain in latest_write
let latest_write_retention = 50;
let mut source_jobs: futures::stream::FuturesUnordered<_> = source_jobs.into_iter().collect();
loop {
tokio::select! {
_ = source_jobs.next() => {
warn!("shutting down quic_plugin_source because subtask failed...");
break;
},
_ = exit.recv() => {
warn!("shutting down quic_plugin_source...");
break;
}
msg = msg_receiver.recv() => {
match msg {
Ok(msg) => {
process_account_updated_from_sources(&account_write_queue_sender,
&slot_queue_sender,
&msg_receiver,
msg,
&mut latest_write,
latest_write_retention,
&metdata_write_queue_sender,
&filters,
).await ;
}
Err(e) => {
warn!("failed to process quic event: {:?}", e);
break;
}
};
},
};
}
// close all channels to notify downstream CSPs of error
account_write_queue_sender.close();
metdata_write_queue_sender.map(|s| s.close());
slot_queue_sender.close();
}
// consume channel with snapshot and update data
async fn process_account_updated_from_sources(
account_write_queue_sender: &Sender<AccountOrSnapshotUpdate>,
slot_queue_sender: &Sender<SlotUpdate>,
msg_receiver: &Receiver<SourceMessage>,
msg: SourceMessage,
latest_write: &mut HashMap<Slot, HashMap<Pubkey, u64>>,
// in slots
latest_write_retention: u64,
// metric_account_writes: &mut MetricU64,
// metric_account_queue: &mut MetricU64,
// metric_dedup_queue: &mut MetricU64,
// metric_slot_queue: &mut MetricU64,
// metric_slot_updates: &mut MetricU64,
// metric_snapshots: &mut MetricU64,
// metric_snapshot_account_writes: &mut MetricU64,
metdata_write_queue_sender: &Option<Sender<FeedMetadata>>,
filters: &HashSet<Pubkey>,
) {
let metadata_sender = |msg| {
if let Some(sender) = &metdata_write_queue_sender {
sender.send_blocking(msg)
} else {
Ok(())
}
};
metrics::QUIC_DEDUP_QUEUE.set(msg_receiver.len() as i64);
match msg {
SourceMessage::QuicMessage(message) => {
match message {
Message::AccountMsg(account_message) => {
metrics::QUIC_ACCOUNT_WRITES.inc();
metrics::QUIC_ACCOUNT_WRITE_QUEUE.set(account_write_queue_sender.len() as i64);
let solana_account = account_message.solana_account();
// Skip writes that a different server has already sent
let pubkey_writes = latest_write
.entry(account_message.slot_identifier.slot)
.or_default();
if !filters.contains(&account_message.pubkey) {
return;
}
let writes = pubkey_writes.entry(account_message.pubkey).or_insert(0);
if account_message.write_version <= *writes {
return;
}
*writes = account_message.write_version;
latest_write.retain(|&k, _| {
k >= account_message.slot_identifier.slot - latest_write_retention
});
account_write_queue_sender
.send(AccountOrSnapshotUpdate::AccountUpdate(AccountWrite {
pubkey: account_message.pubkey,
slot: account_message.slot_identifier.slot,
write_version: account_message.write_version,
lamports: account_message.lamports,
owner: account_message.owner,
executable: account_message.executable,
rent_epoch: account_message.rent_epoch,
data: solana_account.data,
}))
.await
.expect("send success");
}
Message::SlotMsg(slot_message) => {
metrics::QUIC_SLOT_UPDATES.inc();
metrics::QUIC_SLOT_UPDATE_QUEUE.set(slot_queue_sender.len() as i64);
let status = if slot_message.commitment_config.is_processed() {
SlotStatus::Processed
} else if slot_message.commitment_config.is_confirmed() {
SlotStatus::Confirmed
} else {
SlotStatus::Rooted
};
let slot_update = SlotUpdate {
slot: slot_message.slot,
parent: Some(slot_message.parent),
status,
};
slot_queue_sender
.send(slot_update)
.await
.expect("send success");
}
_ => {
// ignore update
}
}
}
SourceMessage::Snapshot(update) => {
let label = if let Some(prg) = update.program_id {
if prg == spl_token::ID {
"gpa(tokens)"
} else {
"gpa"
}
} else {
"gma"
};
metrics::ACCOUNT_SNAPSHOTS
.with_label_values(&[&label])
.inc();
debug!(
"processing snapshot for program_id {} -> size={} & missing size={}...",
update
.program_id
.map(|x| x.to_string())
.unwrap_or("".to_string()),
update.accounts.len(),
update.missing_accounts.len()
);
if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart(update.program_id)) {
warn!("failed to send feed matadata event: {}", e);
}
let mut updated_accounts = vec![];
for account in update.accounts {
metrics::QUIC_SNAPSHOT_ACCOUNT_WRITES.inc();
metrics::QUIC_ACCOUNT_WRITE_QUEUE.set(account_write_queue_sender.len() as i64);
if !filters.contains(&account.pubkey) {
continue;
}
updated_accounts.push(account);
}
account_write_queue_sender
.send(AccountOrSnapshotUpdate::SnapshotUpdate(updated_accounts))
.await
.expect("send success");
for account in update.missing_accounts {
if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(account)) {
warn!("failed to send feed matadata event: {}", e);
}
}
debug!("processing snapshot done");
if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd(update.program_id)) {
warn!("failed to send feed matadata event: {}", e);
}
}
}
}

View File

@ -1,5 +1,3 @@
use std::any::Any;
use std::panic;
use anchor_lang::Id;
use anchor_spl::token::Token;
use anchor_spl::token_2022::spl_token_2022::extension::transfer_fee::TransferFeeConfig;
@ -14,6 +12,8 @@ use solana_program::clock::Clock;
use solana_program::pubkey::Pubkey;
use solana_program::sysvar::Sysvar;
use solana_sdk::account::ReadableAccount;
use std::any::Any;
use std::panic;
use router_lib::dex::{DexEdge, DexEdgeIdentifier};
@ -189,8 +189,7 @@ pub fn swap_base_output(
output_mint: &Option<TransferFeeConfig>,
amount_out: u64,
) -> anyhow::Result<(u64, u64, u64)> {
let res = panic::catch_unwind(||
{
let res = panic::catch_unwind(|| {
let pool_state = pool;
let block_timestamp = pool_state.open_time + 1; // TODO FAS his is suppose to be the clock
if !pool_state.get_status_by_bit(PoolStatusBitIndex::Swap)
@ -269,7 +268,6 @@ pub fn swap_base_output(
} else {
anyhow::bail!("Something went wrong in raydium cp")
}
}
pub fn get_transfer_fee(

View File

@ -11,6 +11,14 @@ pub struct GrpcSourceConfig {
pub tls: Option<TlsConfig>,
}
#[derive(Clone, Debug, Default, serde_derive::Deserialize)]
pub struct QuicSourceConfig {
pub name: String,
pub connection_string: String,
pub retry_connection_sleep_secs: u64,
pub enable_gso: Option<bool>,
}
#[derive(Clone, Debug, Default, serde_derive::Deserialize)]
pub struct TlsConfig {
pub ca_cert_path: String,
@ -36,6 +44,7 @@ pub struct Config {
pub safety_checks: Option<SafetyCheckConfig>,
pub hot_mints: Option<HotMintsConfig>,
pub debug_config: Option<DebugConfig>,
pub snapshot_timeout_in_seconds: Option<u64>,
}
impl Config {
@ -78,16 +87,15 @@ pub struct InfinityConfig {
#[derive(Clone, Debug, Default, serde_derive::Deserialize)]
pub struct AccountDataSourceConfig {
pub region: Option<String>,
pub use_quic: Option<bool>,
#[serde(deserialize_with = "serde_opt_string_or_env", default)]
pub quic_address: Option<String>,
pub quic_sources: Option<Vec<QuicSourceConfig>>,
#[serde(deserialize_with = "serde_string_or_env")]
pub rpc_http_url: String,
// does RPC node support getProgramAccountsCompressed
pub rpc_support_compression: Option<bool>,
pub re_snapshot_interval_secs: Option<u64>,
pub grpc_sources: Vec<GrpcSourceConfig>,
pub grpc_sources: Option<Vec<GrpcSourceConfig>>,
pub dedup_queue_size: usize,
pub request_timeout_in_seconds: Option<u64>,
}
#[derive(Clone, Debug, serde_derive::Deserialize)]

View File

@ -185,7 +185,11 @@ pub async fn process_tx_events(
let (msg_sender, msg_receiver) = async_channel::bounded::<ExecTx>(config.dedup_queue_size);
let mut source_jobs = vec![];
for grpc_source in config.grpc_sources.clone() {
let Some(grpc_sources) = config.grpc_sources.clone() else {
panic!("There should be atleast one grpc source specified for grpc tx watcher");
};
for grpc_source in grpc_sources.clone() {
let msg_sender = msg_sender.clone();
// Make TLS config if configured

View File

@ -208,8 +208,7 @@ async fn run_all_swap_from_dump(dump_name: &str) -> Result<Result<(), Error>, Er
quote.output_amount != received_out_amount
};
if unexpected_in_amount || unexpected_out_amount
{
if unexpected_in_amount || unexpected_out_amount {
debug_print_ix(
&mut success,
&mut index,
@ -270,7 +269,11 @@ async fn debug_print_ix(
error!(
"Faulty swapping #{} quote{}: \r\n{} -> {} ({} -> {})\r\n (successfully run {} swap)",
index,
if quote.is_exact_out { " (ExactOut)" } else { "" },
if quote.is_exact_out {
" (ExactOut)"
} else {
""
},
quote.input_mint,
quote.output_mint,
quote.input_amount,