diff --git a/CHANGELOG.md b/CHANGELOG.md index f7f3a55..2852640 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The minor version will be incremented upon a breaking change and the patch versi ### Features - client: add `GeyserGrpcClient::subscribe_once2` ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)). +- kafka: add metrics (stats, sent, recv) ([#196](https://github.com/rpcpool/yellowstone-grpc/pull/196)). ### Fixes diff --git a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs index e801f59..9d854c5 100644 --- a/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs +++ b/yellowstone-grpc-kafka/src/bin/grpc-kafka.rs @@ -5,12 +5,7 @@ use { future::{BoxFuture, FutureExt}, stream::StreamExt, }, - rdkafka::{ - config::ClientConfig, - consumer::{Consumer, StreamConsumer}, - message::Message, - producer::{FutureProducer, FutureRecord}, - }, + rdkafka::{config::ClientConfig, consumer::Consumer, message::Message, producer::FutureRecord}, sha2::{Digest, Sha256}, std::{net::SocketAddr, sync::Arc, time::Duration}, tokio::{ @@ -102,12 +97,12 @@ impl ArgsAction { } // input - let consumer: StreamConsumer = kafka_config.create()?; + let consumer = prom::kafka::StatsContext::create_stream_consumer(&kafka_config) + .context("failed to create kafka consumer")?; consumer.subscribe(&[&config.kafka_input])?; // output - let kafka: FutureProducer = kafka_config - .create() + let kafka = prom::kafka::StatsContext::create_future_producer(&kafka_config) .context("failed to create kafka producer")?; // dedup @@ -131,6 +126,7 @@ impl ArgsAction { }, message = consumer.recv() => message, }?; + prom::kafka::recv_inc(); trace!( "received message with key: {:?}", message.key().and_then(|k| std::str::from_utf8(k).ok()) @@ -171,11 +167,13 @@ impl ArgsAction { debug!("kafka send message with key: {key}, result: {result:?}"); result?.map_err(|(error, _message)| error)?; + prom::kafka::sent_inc(prom::kafka::GprcMessageKind::Unknown); Ok::<(), anyhow::Error>(()) } Err(error) => Err(error.0.into()), } } else { + prom::kafka::dedup_inc(); Ok(()) } }); @@ -207,8 +205,7 @@ impl ArgsAction { } // Connect to kafka - let kafka: FutureProducer = kafka_config - .create() + let kafka = prom::kafka::StatsContext::create_future_producer(&kafka_config) .context("failed to create kafka producer")?; // Create gRPC client & subscribe @@ -244,23 +241,23 @@ impl ArgsAction { match message { Some(message) => { - if matches!(message.update_oneof, Some(UpdateOneof::Ping(_))) { - continue; - } - - let slot = match &message.update_oneof { - Some(UpdateOneof::Account(msg)) => msg.slot, - Some(UpdateOneof::Slot(msg)) => msg.slot, - Some(UpdateOneof::Transaction(msg)) => msg.slot, - Some(UpdateOneof::Block(msg)) => msg.slot, - Some(UpdateOneof::Ping(_)) => unreachable!("Ping message not expected"), - Some(UpdateOneof::BlockMeta(msg)) => msg.slot, - Some(UpdateOneof::Entry(msg)) => msg.slot, + let payload = message.encode_to_vec(); + let message = match &message.update_oneof { + Some(value) => value, None => unreachable!("Expect valid message"), }; - let payload = message.encode_to_vec(); + let slot = match message { + UpdateOneof::Account(msg) => msg.slot, + UpdateOneof::Slot(msg) => msg.slot, + UpdateOneof::Transaction(msg) => msg.slot, + UpdateOneof::Block(msg) => msg.slot, + UpdateOneof::Ping(_) => continue, + UpdateOneof::BlockMeta(msg) => msg.slot, + UpdateOneof::Entry(msg) => msg.slot, + }; let hash = Sha256::digest(&payload); let key = format!("{slot}_{}", const_hex::encode(hash)); + let prom_kind = prom::kafka::GprcMessageKind::from(message); let record = FutureRecord::to(&config.kafka_topic) .key(&key) @@ -272,9 +269,9 @@ impl ArgsAction { let result = future.await; debug!("kafka send message with key: {key}, result: {result:?}"); - Ok::<(i32, i64), anyhow::Error>( - result?.map_err(|(error, _message)| error)?, - ) + let result = result?.map_err(|(error, _message)| error)?; + prom::kafka::sent_inc(prom_kind); + Ok::<(i32, i64), anyhow::Error>(result) }); if send_tasks.len() >= config.kafka_queue_size { tokio::select! { @@ -311,7 +308,8 @@ impl ArgsAction { let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?; - let consumer: StreamConsumer = kafka_config.create()?; + let consumer = prom::kafka::StatsContext::create_stream_consumer(&kafka_config) + .context("failed to create kafka consumer")?; consumer.subscribe(&[&config.kafka_topic])?; loop { @@ -319,6 +317,7 @@ impl ArgsAction { _ = &mut shutdown => break, message = consumer.recv() => message?, }; + prom::kafka::recv_inc(); debug!( "received message with key: {:?}", message.key().and_then(|k| std::str::from_utf8(k).ok()) diff --git a/yellowstone-grpc-kafka/src/prom.rs b/yellowstone-grpc-kafka/src/prom.rs index 5b0513e..eda261f 100644 --- a/yellowstone-grpc-kafka/src/prom.rs +++ b/yellowstone-grpc-kafka/src/prom.rs @@ -5,18 +5,36 @@ use { service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }, - prometheus::{IntCounterVec, Opts, Registry, TextEncoder}, + prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts, Registry, TextEncoder}, std::{net::SocketAddr, sync::Once}, tracing::{error, info}, }; lazy_static::lazy_static! { - pub static ref REGISTRY: Registry = Registry::new(); + static ref REGISTRY: Registry = Registry::new(); static ref VERSION: IntCounterVec = IntCounterVec::new( Opts::new("version", "Plugin version info"), &["buildts", "git", "package", "proto", "rustc", "solana", "version"] ).unwrap(); + + static ref KAFKA_STATS: GaugeVec = GaugeVec::new( + Opts::new("kafka_stats", "librdkafka metrics"), + &["broker", "metric"] + ).unwrap(); + + static ref KAFKA_DEDUP_TOTAL: IntCounter = IntCounter::new( + "kafka_dedup_total", "Total number of deduplicated messages" + ).unwrap(); + + static ref KAFKA_RECV_TOTAL: IntCounter = IntCounter::new( + "kafka_recv_total", "Total number of received messages" + ).unwrap(); + + static ref KAFKA_SENT_TOTAL: IntCounterVec = IntCounterVec::new( + Opts::new("kafka_sent_total", "Total number of uploaded messages by type"), + &["kind"] + ).unwrap(); } pub fn run_server(address: Option) -> anyhow::Result<()> { @@ -30,6 +48,10 @@ pub fn run_server(address: Option) -> anyhow::Result<()> { }; } register!(VERSION); + register!(KAFKA_STATS); + register!(KAFKA_DEDUP_TOTAL); + register!(KAFKA_RECV_TOTAL); + register!(KAFKA_SENT_TOTAL); VERSION .with_label_values(&[ @@ -82,3 +104,141 @@ fn not_found_handler() -> Response { .body(Body::empty()) .unwrap() } + +pub mod kafka { + use { + super::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS}, + rdkafka::{ + client::ClientContext, + config::{ClientConfig, FromClientConfigAndContext}, + consumer::{ConsumerContext, StreamConsumer}, + error::KafkaResult, + producer::FutureProducer, + statistics::Statistics, + }, + yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof, + }; + + #[derive(Debug, Default, Clone, Copy)] + pub struct StatsContext; + + impl ClientContext for StatsContext { + fn stats(&self, statistics: Statistics) { + for (name, broker) in statistics.brokers { + macro_rules! set_value { + ($name:expr, $value:expr) => { + KAFKA_STATS + .with_label_values(&[&name, $name]) + .set($value as f64); + }; + } + + set_value!("outbuf_cnt", broker.outbuf_cnt); + set_value!("outbuf_msg_cnt", broker.outbuf_msg_cnt); + set_value!("waitresp_cnt", broker.waitresp_cnt); + set_value!("waitresp_msg_cnt", broker.waitresp_msg_cnt); + set_value!("tx", broker.tx); + set_value!("txerrs", broker.txerrs); + set_value!("txretries", broker.txretries); + set_value!("req_timeouts", broker.req_timeouts); + + if let Some(window) = broker.int_latency { + set_value!("int_latency.min", window.min); + set_value!("int_latency.max", window.max); + set_value!("int_latency.avg", window.avg); + set_value!("int_latency.sum", window.sum); + set_value!("int_latency.cnt", window.cnt); + set_value!("int_latency.stddev", window.stddev); + set_value!("int_latency.hdrsize", window.hdrsize); + set_value!("int_latency.p50", window.p50); + set_value!("int_latency.p75", window.p75); + set_value!("int_latency.p90", window.p90); + set_value!("int_latency.p95", window.p95); + set_value!("int_latency.p99", window.p99); + set_value!("int_latency.p99_99", window.p99_99); + set_value!("int_latency.outofrange", window.outofrange); + } + + if let Some(window) = broker.outbuf_latency { + set_value!("outbuf_latency.min", window.min); + set_value!("outbuf_latency.max", window.max); + set_value!("outbuf_latency.avg", window.avg); + set_value!("outbuf_latency.sum", window.sum); + set_value!("outbuf_latency.cnt", window.cnt); + set_value!("outbuf_latency.stddev", window.stddev); + set_value!("outbuf_latency.hdrsize", window.hdrsize); + set_value!("outbuf_latency.p50", window.p50); + set_value!("outbuf_latency.p75", window.p75); + set_value!("outbuf_latency.p90", window.p90); + set_value!("outbuf_latency.p95", window.p95); + set_value!("outbuf_latency.p99", window.p99); + set_value!("outbuf_latency.p99_99", window.p99_99); + set_value!("outbuf_latency.outofrange", window.outofrange); + } + } + } + } + + impl ConsumerContext for StatsContext {} + + impl StatsContext { + pub fn create_future_producer(config: &ClientConfig) -> KafkaResult> { + FutureProducer::from_config_and_context(config, Self) + } + + pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult> { + StreamConsumer::from_config_and_context(config, Self) + } + } + + #[derive(Debug, Clone, Copy)] + pub enum GprcMessageKind { + Account, + Slot, + Transaction, + Block, + BlockMeta, + Entry, + Unknown, + } + + impl From<&UpdateOneof> for GprcMessageKind { + fn from(msg: &UpdateOneof) -> Self { + match msg { + UpdateOneof::Account(_) => Self::Account, + UpdateOneof::Slot(_) => Self::Slot, + UpdateOneof::Transaction(_) => Self::Transaction, + UpdateOneof::Block(_) => Self::Block, + UpdateOneof::Ping(_) => unreachable!(), + UpdateOneof::BlockMeta(_) => Self::BlockMeta, + UpdateOneof::Entry(_) => Self::Entry, + } + } + } + + impl GprcMessageKind { + const fn as_str(self) -> &'static str { + match self { + GprcMessageKind::Account => "account", + GprcMessageKind::Slot => "slot", + GprcMessageKind::Transaction => "transaction", + GprcMessageKind::Block => "block", + GprcMessageKind::BlockMeta => "blockmeta", + GprcMessageKind::Entry => "entry", + GprcMessageKind::Unknown => "unknown", + } + } + } + + pub fn sent_inc(kind: GprcMessageKind) { + KAFKA_SENT_TOTAL.with_label_values(&[kind.as_str()]).inc() + } + + pub fn recv_inc() { + KAFKA_RECV_TOTAL.inc(); + } + + pub fn dedup_inc() { + KAFKA_DEDUP_TOTAL.inc(); + } +}