Add prometheus metrics to fills

This commit is contained in:
Riordan Panayides 2022-10-05 23:50:21 +01:00
parent cb95574b8a
commit ae7ffc1dcb
5 changed files with 148 additions and 2 deletions

91
Cargo.lock generated
View File

@ -779,6 +779,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "buf_redux"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
dependencies = [
"memchr",
"safemem",
]
[[package]]
name = "bumpalo"
version = "3.11.0"
@ -2909,6 +2919,16 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
dependencies = [
"mime 0.3.16",
"unicase 2.6.0",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -3026,6 +3046,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multipart"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182"
dependencies = [
"buf_redux",
"httparse",
"log 0.4.17",
"mime 0.3.16",
"mime_guess",
"quick-error",
"rand 0.8.5",
"safemem",
"tempfile",
"twoway",
]
[[package]]
name = "native-tls"
version = "0.2.10"
@ -3888,6 +3926,12 @@ dependencies = [
"percent-encoding 2.1.0",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-protobuf"
version = "0.8.0"
@ -4523,6 +4567,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "scoped-tls"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -4737,6 +4787,7 @@ dependencies = [
"tokio",
"tokio-tungstenite",
"toml",
"warp",
"ws",
]
@ -7074,6 +7125,15 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "twoway"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1"
dependencies = [
"memchr",
]
[[package]]
name = "typeable"
version = "0.1.2"
@ -7289,6 +7349,37 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d"
dependencies = [
"bytes 1.2.1",
"futures-channel",
"futures-util",
"headers",
"http",
"hyper 0.14.20",
"log 0.4.17",
"mime 0.3.16",
"mime_guess",
"multipart",
"percent-encoding 2.1.0",
"pin-project",
"rustls-pemfile 0.2.1",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-util 0.7.2",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"

View File

@ -7,8 +7,8 @@ kill_timeout = 5
image = "us-docker.pkg.dev/mango-markets/gcr.io/mango-geyser-services:latest"
[processes]
fills = "service-mango-fills fills-config.toml"
pnl = "service-mango-pnl pnl-config.toml"
fills = "service-mango-fills fills-config.toml"
pnl = "service-mango-pnl pnl-config.toml"
[[services]]
processes = ["fills"]
@ -35,3 +35,7 @@ pnl = "service-mango-pnl pnl-config.toml"
type = "connections"
hard_limit = 1024
soft_limit = 1024
[metrics]
port = 9091
path = "/metrics"

View File

@ -122,6 +122,26 @@ impl Metrics {
},
}
}
pub fn get_registry_vec(&self) -> Vec<(String, String)> {
let mut vec: Vec<(String, String)> = Vec::new();
let metrics = self.registry.read().unwrap();
for (name, value) in metrics.iter() {
let value_str = match value {
Value::U64(v) => {
format!("{}", v.load(atomic::Ordering::Acquire))
}
Value::I64(v) => {
format!("{}", v.load(atomic::Ordering::Acquire))
}
Value::String(v) => {
format!("{}", v.lock().unwrap())
}
};
vec.push((name.clone(), value_str));
}
vec
}
}
pub fn start() -> Metrics {

View File

@ -22,3 +22,4 @@ async-channel = "1.6"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
warp = "0.3"

View File

@ -14,6 +14,8 @@ use solana_geyser_connector_lib::{
grpc_plugin_source, metrics, websocket_source, SourceConfig,
};
use crate::metrics::Metrics;
use warp::{Filter, Rejection, Reply};
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
@ -68,6 +70,26 @@ async fn handle_connection(
Ok(())
}
async fn handle_metrics(metrics: Metrics) -> Result<impl Reply, Rejection> {
let labels = HashMap::from([("process", "fills")]);
let label_strings_vec: Vec<String> = labels
.iter()
.map(|(name, value)| format!("{}=\"{}\"", name, value))
.collect();
let lines: Vec<String> = metrics
.get_registry_vec()
.iter()
.map(|(name, value)| format!("{}{{{}}} {}", name, label_strings_vec.join(","), value))
.collect();
Ok(lines.join("\n"))
}
pub fn with_metrics(
metrics: Metrics,
) -> impl Filter<Extract = (Metrics,), Error = std::convert::Infallible> + Clone {
warp::any().map(move || metrics.clone())
}
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub source: SourceConfig,
@ -93,6 +115,14 @@ async fn main() -> anyhow::Result<()> {
solana_logger::setup_with_default("info");
let metrics_tx = metrics::start();
let metrics_route = warp::path!("metrics")
.and(with_metrics(metrics_tx.clone()))
.and_then(handle_metrics);
// serve prometheus metrics endpoint
tokio::spawn(async move {
warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await;
});
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
fill_event_filter::init(config.markets.clone()).await?;