Changes to activate all raydium pools on production (#15)
* Changes to activate all raydium pools on production * chaning quic client and repanicking on lags * fixing prometheus server * Removing CORs from prometheus server * removing unwanted error message from prometheus * Adding region dfw for dallas * updating name in config file
This commit is contained in:
parent
2d8ada1117
commit
65a7a822bc
|
@ -1409,6 +1409,12 @@ dependencies = [
|
|||
"generic-array 0.14.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "circular-buffer"
|
||||
version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b67261db007b5f4cf8cba393c1a5c511a5cc072339ce16e12aeba1d7b9b77946"
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "2.34.0"
|
||||
|
@ -5330,7 +5336,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "quic-geyser-client"
|
||||
version = "0.1.5"
|
||||
source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48"
|
||||
source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#594077ccebfde826920192c17b0e501bb185650e"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
|
@ -5347,10 +5353,11 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "quic-geyser-common"
|
||||
version = "0.1.5"
|
||||
source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48"
|
||||
source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#594077ccebfde826920192c17b0e501bb185650e"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"circular-buffer",
|
||||
"itertools 0.10.5",
|
||||
"log 0.4.21",
|
||||
"lz4",
|
||||
|
|
|
@ -272,7 +272,7 @@ impl EdgeState {
|
|||
}
|
||||
|
||||
pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> {
|
||||
if !self.is_valid_out() {
|
||||
if !self.is_valid() {
|
||||
return None;
|
||||
}
|
||||
|
||||
|
@ -304,22 +304,6 @@ impl EdgeState {
|
|||
true
|
||||
}
|
||||
|
||||
pub fn is_valid_out(&self) -> bool {
|
||||
if !self.is_valid {
|
||||
return false;
|
||||
}
|
||||
|
||||
if self.cooldown_until.is_some() {
|
||||
// Do not check time here !
|
||||
// We will reset "cooldown until" on first account update coming after cooldown
|
||||
// So if this is not reset yet, it means that we didn't change anything
|
||||
// No reason to be working again
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub fn reset_cooldown(&mut self) {
|
||||
self.cooldown_event += 0;
|
||||
self.cooldown_until = None;
|
||||
|
|
|
@ -16,7 +16,7 @@ use std::time::{Duration, Instant};
|
|||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Dex {
|
||||
|
@ -242,14 +242,17 @@ impl EdgeUpdater {
|
|||
Some(since) => {
|
||||
if since.elapsed() > max_lag_duration {
|
||||
panic!(
|
||||
"Lagging a lot {} for more than {}s, exiting..",
|
||||
"Lagging a lot {} for more than {}s, for dex {}..",
|
||||
lag,
|
||||
max_lag_duration.as_secs()
|
||||
max_lag_duration.as_secs(),
|
||||
self.dex.name,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
} else if state.slot_excessive_lagging_since.is_some() {
|
||||
state.slot_excessive_lagging_since = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -333,7 +336,9 @@ impl EdgeUpdater {
|
|||
};
|
||||
|
||||
state.received_account.insert(pk);
|
||||
state.latest_slot_pending = slot;
|
||||
if state.latest_slot_pending < slot {
|
||||
state.latest_slot_pending = slot;
|
||||
}
|
||||
|
||||
self.check_readiness();
|
||||
|
||||
|
@ -398,7 +403,7 @@ impl EdgeUpdater {
|
|||
state.latest_slot_processed = state.latest_slot_pending;
|
||||
|
||||
if started_at.elapsed() > Duration::from_millis(100) {
|
||||
info!(
|
||||
debug!(
|
||||
"{} - refresh {} - took - {:?}",
|
||||
self.dex.name,
|
||||
refreshed_edges.len(),
|
||||
|
|
|
@ -99,6 +99,14 @@ async fn main() -> anyhow::Result<()> {
|
|||
let config = Config::load(&args[1])?;
|
||||
let router_version = RouterVersion::OverestimateAmount;
|
||||
|
||||
if config.metrics.output_http {
|
||||
let prom_bind_addr = config
|
||||
.metrics
|
||||
.prometheus_address
|
||||
.clone()
|
||||
.expect("prometheus_address must be set");
|
||||
PrometheusSync::sync(prom_bind_addr);
|
||||
}
|
||||
let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints)));
|
||||
|
||||
let mango_data = match mango::mango_fetcher::fetch_mango_data().await {
|
||||
|
@ -200,14 +208,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
exit(-1);
|
||||
};
|
||||
|
||||
if config.metrics.output_http {
|
||||
let prom_bind_addr = config
|
||||
.metrics
|
||||
.prometheus_address
|
||||
.clone()
|
||||
.expect("prometheus_address must be set");
|
||||
let _prometheus = PrometheusSync::sync(prom_bind_addr);
|
||||
}
|
||||
if config.metrics.output_stdout {
|
||||
warn!("metrics output to stdout is not supported yet");
|
||||
}
|
||||
|
|
|
@ -1,17 +1,15 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use axum::{routing, Router};
|
||||
use prometheus::{Encoder, TextEncoder};
|
||||
use tokio::net::{TcpListener, ToSocketAddrs};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::{TcpListener, TcpStream, ToSocketAddrs},
|
||||
};
|
||||
use tracing::error;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::server::errors::AppError;
|
||||
|
||||
pub struct PrometheusSync;
|
||||
|
||||
impl PrometheusSync {
|
||||
fn create_response(payload: &str) -> String {
|
||||
fn create_response(payload: String) -> String {
|
||||
format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
|
||||
payload.len(),
|
||||
|
@ -19,7 +17,7 @@ impl PrometheusSync {
|
|||
)
|
||||
}
|
||||
|
||||
async fn handle_stream(stream: &mut TcpStream) -> anyhow::Result<()> {
|
||||
async fn get_prometheus_stream() -> Result<String, AppError> {
|
||||
let mut metrics_buffer = Vec::new();
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
|
@ -29,29 +27,22 @@ impl PrometheusSync {
|
|||
.unwrap();
|
||||
|
||||
let metrics_buffer = String::from_utf8(metrics_buffer).unwrap();
|
||||
let response = Self::create_response(&metrics_buffer);
|
||||
|
||||
stream.writable().await?;
|
||||
stream.write_all(response.as_bytes()).await?;
|
||||
|
||||
stream.flush().await?;
|
||||
|
||||
Ok(())
|
||||
Ok(Self::create_response(metrics_buffer))
|
||||
}
|
||||
|
||||
pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle<anyhow::Result<()>> {
|
||||
tokio::spawn(async move {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
loop {
|
||||
let Ok((mut stream, _addr)) = listener.accept().await else {
|
||||
error!("Error accepting prometheus stream");
|
||||
tokio::time::sleep(Duration::from_millis(1)).await;
|
||||
continue;
|
||||
};
|
||||
let mut router: Router<()> = Router::new();
|
||||
router = router.route("/metrics", routing::get(Self::get_prometheus_stream));
|
||||
|
||||
let _ = Self::handle_stream(&mut stream).await;
|
||||
}
|
||||
let handle = axum::serve(listener, router);
|
||||
|
||||
info!("Prometheus Server started");
|
||||
|
||||
handle.await.expect("Prometheus Server failed");
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
pub mod alt_provider;
|
||||
mod errors;
|
||||
pub mod errors;
|
||||
pub mod hash_provider;
|
||||
pub mod http_server;
|
||||
pub mod live_account_provider;
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
snapshot_timeout_in_seconds = 900
|
||||
|
||||
[infinity]
|
||||
enabled = true
|
||||
|
||||
|
@ -22,8 +24,8 @@ add_mango_tokens = false
|
|||
[raydium]
|
||||
enabled = true
|
||||
mints = []
|
||||
take_all_mints = false
|
||||
add_mango_tokens = true
|
||||
take_all_mints = true
|
||||
add_mango_tokens = false
|
||||
|
||||
[raydium_cp]
|
||||
enabled = true
|
||||
|
@ -77,6 +79,7 @@ dedup_queue_size = 50000
|
|||
rpc_http_url = "$RPC_HTTP_URL"
|
||||
rpc_support_compression = true
|
||||
re_snapshot_interval_secs = 1200
|
||||
request_timeout_in_seconds = 300
|
||||
|
||||
[[sources.grpc_sources]]
|
||||
name = "router-other"
|
||||
|
@ -84,12 +87,40 @@ connection_string = "$RPC_HTTP_URL_WITHOUT_TOKEN"
|
|||
token = "$RPC_TOKEN"
|
||||
retry_connection_sleep_secs = 30
|
||||
|
||||
[[sources.quic_sources]]
|
||||
name = "quic-client"
|
||||
connection_string = "$RPC_QUIC_URL"
|
||||
retry_connection_sleep_secs = 1
|
||||
enable_gso = false
|
||||
|
||||
[[sources]]
|
||||
region = "dfw"
|
||||
dedup_queue_size = 50000
|
||||
rpc_http_url = "$DFW_RPC_HTTP_URL"
|
||||
rpc_support_compression = true
|
||||
re_snapshot_interval_secs = 1200
|
||||
request_timeout_in_seconds = 300
|
||||
|
||||
[[sources.grpc_sources]]
|
||||
name = "router-dfw"
|
||||
connection_string = "$DFW_RPC_HTTP_URL_WITHOUT_TOKEN"
|
||||
token = "$AMS_RPC_TOKEN"
|
||||
retry_connection_sleep_secs = 30
|
||||
|
||||
[[sources.quic_sources]]
|
||||
name = "quic-client-dfw"
|
||||
connection_string = "$DFW_RPC_QUIC_URL"
|
||||
retry_connection_sleep_secs = 1
|
||||
enable_gso = false
|
||||
|
||||
|
||||
[[sources]]
|
||||
region = "ams"
|
||||
dedup_queue_size = 50000
|
||||
rpc_http_url = "$AMS_RPC_HTTP_URL"
|
||||
rpc_support_compression = true
|
||||
re_snapshot_interval_secs = 1200
|
||||
request_timeout_in_seconds = 300
|
||||
|
||||
[[sources.grpc_sources]]
|
||||
name = "router-ams"
|
||||
|
@ -97,6 +128,12 @@ connection_string = "$AMS_RPC_HTTP_URL_WITHOUT_TOKEN"
|
|||
token = "$AMS_RPC_TOKEN"
|
||||
retry_connection_sleep_secs = 30
|
||||
|
||||
[[sources.quic_sources]]
|
||||
name = "quic-client-ams "
|
||||
connection_string = "$AMS_RPC_QUIC_URL"
|
||||
retry_connection_sleep_secs = 1
|
||||
enable_gso = false
|
||||
|
||||
[price_feed]
|
||||
birdeye_token = "$BIRDEYE_TOKEN"
|
||||
refresh_interval_secs = 1200 # every 20 min
|
||||
|
|
4
fly.toml
4
fly.toml
|
@ -10,8 +10,8 @@ kill_timeout = "30s"
|
|||
cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"]
|
||||
|
||||
[[vm]]
|
||||
size = "shared-cpu-4x"
|
||||
memory = "8gb"
|
||||
size = "performance-4x"
|
||||
memory = "16gb"
|
||||
|
||||
[[restart]]
|
||||
policy = "always"
|
||||
|
|
|
@ -14,6 +14,7 @@ pub struct GrpcSourceConfig {
|
|||
#[derive(Clone, Debug, Default, serde_derive::Deserialize)]
|
||||
pub struct QuicSourceConfig {
|
||||
pub name: String,
|
||||
#[serde(deserialize_with = "serde_string_or_env")]
|
||||
pub connection_string: String,
|
||||
pub retry_connection_sleep_secs: u64,
|
||||
pub enable_gso: Option<bool>,
|
||||
|
|
Loading…
Reference in New Issue