kafka: add metrics (stats, sent, recv) (#196)

This commit is contained in:
Kirill Fomichev 2023-10-09 17:12:44 +04:00 committed by GitHub
parent b9e734fe1d
commit f954647c59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 190 additions and 30 deletions

View File

@ -15,6 +15,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features
- client: add `GeyserGrpcClient::subscribe_once2` ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).
- kafka: add metrics (stats, sent, recv) ([#196](https://github.com/rpcpool/yellowstone-grpc/pull/196)).
### Fixes

View File

@ -5,12 +5,7 @@ use {
future::{BoxFuture, FutureExt},
stream::StreamExt,
},
rdkafka::{
config::ClientConfig,
consumer::{Consumer, StreamConsumer},
message::Message,
producer::{FutureProducer, FutureRecord},
},
rdkafka::{config::ClientConfig, consumer::Consumer, message::Message, producer::FutureRecord},
sha2::{Digest, Sha256},
std::{net::SocketAddr, sync::Arc, time::Duration},
tokio::{
@ -102,12 +97,12 @@ impl ArgsAction {
}
// input
let consumer: StreamConsumer = kafka_config.create()?;
let consumer = prom::kafka::StatsContext::create_stream_consumer(&kafka_config)
.context("failed to create kafka consumer")?;
consumer.subscribe(&[&config.kafka_input])?;
// output
let kafka: FutureProducer = kafka_config
.create()
let kafka = prom::kafka::StatsContext::create_future_producer(&kafka_config)
.context("failed to create kafka producer")?;
// dedup
@ -131,6 +126,7 @@ impl ArgsAction {
},
message = consumer.recv() => message,
}?;
prom::kafka::recv_inc();
trace!(
"received message with key: {:?}",
message.key().and_then(|k| std::str::from_utf8(k).ok())
@ -171,11 +167,13 @@ impl ArgsAction {
debug!("kafka send message with key: {key}, result: {result:?}");
result?.map_err(|(error, _message)| error)?;
prom::kafka::sent_inc(prom::kafka::GprcMessageKind::Unknown);
Ok::<(), anyhow::Error>(())
}
Err(error) => Err(error.0.into()),
}
} else {
prom::kafka::dedup_inc();
Ok(())
}
});
@ -207,8 +205,7 @@ impl ArgsAction {
}
// Connect to kafka
let kafka: FutureProducer = kafka_config
.create()
let kafka = prom::kafka::StatsContext::create_future_producer(&kafka_config)
.context("failed to create kafka producer")?;
// Create gRPC client & subscribe
@ -244,23 +241,23 @@ impl ArgsAction {
match message {
Some(message) => {
if matches!(message.update_oneof, Some(UpdateOneof::Ping(_))) {
continue;
}
let slot = match &message.update_oneof {
Some(UpdateOneof::Account(msg)) => msg.slot,
Some(UpdateOneof::Slot(msg)) => msg.slot,
Some(UpdateOneof::Transaction(msg)) => msg.slot,
Some(UpdateOneof::Block(msg)) => msg.slot,
Some(UpdateOneof::Ping(_)) => unreachable!("Ping message not expected"),
Some(UpdateOneof::BlockMeta(msg)) => msg.slot,
Some(UpdateOneof::Entry(msg)) => msg.slot,
let payload = message.encode_to_vec();
let message = match &message.update_oneof {
Some(value) => value,
None => unreachable!("Expect valid message"),
};
let payload = message.encode_to_vec();
let slot = match message {
UpdateOneof::Account(msg) => msg.slot,
UpdateOneof::Slot(msg) => msg.slot,
UpdateOneof::Transaction(msg) => msg.slot,
UpdateOneof::Block(msg) => msg.slot,
UpdateOneof::Ping(_) => continue,
UpdateOneof::BlockMeta(msg) => msg.slot,
UpdateOneof::Entry(msg) => msg.slot,
};
let hash = Sha256::digest(&payload);
let key = format!("{slot}_{}", const_hex::encode(hash));
let prom_kind = prom::kafka::GprcMessageKind::from(message);
let record = FutureRecord::to(&config.kafka_topic)
.key(&key)
@ -272,9 +269,9 @@ impl ArgsAction {
let result = future.await;
debug!("kafka send message with key: {key}, result: {result:?}");
Ok::<(i32, i64), anyhow::Error>(
result?.map_err(|(error, _message)| error)?,
)
let result = result?.map_err(|(error, _message)| error)?;
prom::kafka::sent_inc(prom_kind);
Ok::<(i32, i64), anyhow::Error>(result)
});
if send_tasks.len() >= config.kafka_queue_size {
tokio::select! {
@ -311,7 +308,8 @@ impl ArgsAction {
let (grpc_tx, grpc_shutdown) = GrpcService::run(config.listen, config.channel_capacity)?;
let consumer: StreamConsumer = kafka_config.create()?;
let consumer = prom::kafka::StatsContext::create_stream_consumer(&kafka_config)
.context("failed to create kafka consumer")?;
consumer.subscribe(&[&config.kafka_topic])?;
loop {
@ -319,6 +317,7 @@ impl ArgsAction {
_ = &mut shutdown => break,
message = consumer.recv() => message?,
};
prom::kafka::recv_inc();
debug!(
"received message with key: {:?}",
message.key().and_then(|k| std::str::from_utf8(k).ok())

View File

@ -5,18 +5,36 @@ use {
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
},
prometheus::{IntCounterVec, Opts, Registry, TextEncoder},
prometheus::{GaugeVec, IntCounter, IntCounterVec, Opts, Registry, TextEncoder},
std::{net::SocketAddr, sync::Once},
tracing::{error, info},
};
lazy_static::lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
static ref REGISTRY: Registry = Registry::new();
static ref VERSION: IntCounterVec = IntCounterVec::new(
Opts::new("version", "Plugin version info"),
&["buildts", "git", "package", "proto", "rustc", "solana", "version"]
).unwrap();
static ref KAFKA_STATS: GaugeVec = GaugeVec::new(
Opts::new("kafka_stats", "librdkafka metrics"),
&["broker", "metric"]
).unwrap();
static ref KAFKA_DEDUP_TOTAL: IntCounter = IntCounter::new(
"kafka_dedup_total", "Total number of deduplicated messages"
).unwrap();
static ref KAFKA_RECV_TOTAL: IntCounter = IntCounter::new(
"kafka_recv_total", "Total number of received messages"
).unwrap();
static ref KAFKA_SENT_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("kafka_sent_total", "Total number of uploaded messages by type"),
&["kind"]
).unwrap();
}
pub fn run_server(address: Option<SocketAddr>) -> anyhow::Result<()> {
@ -30,6 +48,10 @@ pub fn run_server(address: Option<SocketAddr>) -> anyhow::Result<()> {
};
}
register!(VERSION);
register!(KAFKA_STATS);
register!(KAFKA_DEDUP_TOTAL);
register!(KAFKA_RECV_TOTAL);
register!(KAFKA_SENT_TOTAL);
VERSION
.with_label_values(&[
@ -82,3 +104,141 @@ fn not_found_handler() -> Response<Body> {
.body(Body::empty())
.unwrap()
}
pub mod kafka {
use {
super::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS},
rdkafka::{
client::ClientContext,
config::{ClientConfig, FromClientConfigAndContext},
consumer::{ConsumerContext, StreamConsumer},
error::KafkaResult,
producer::FutureProducer,
statistics::Statistics,
},
yellowstone_grpc_proto::prelude::subscribe_update::UpdateOneof,
};
#[derive(Debug, Default, Clone, Copy)]
pub struct StatsContext;
impl ClientContext for StatsContext {
fn stats(&self, statistics: Statistics) {
for (name, broker) in statistics.brokers {
macro_rules! set_value {
($name:expr, $value:expr) => {
KAFKA_STATS
.with_label_values(&[&name, $name])
.set($value as f64);
};
}
set_value!("outbuf_cnt", broker.outbuf_cnt);
set_value!("outbuf_msg_cnt", broker.outbuf_msg_cnt);
set_value!("waitresp_cnt", broker.waitresp_cnt);
set_value!("waitresp_msg_cnt", broker.waitresp_msg_cnt);
set_value!("tx", broker.tx);
set_value!("txerrs", broker.txerrs);
set_value!("txretries", broker.txretries);
set_value!("req_timeouts", broker.req_timeouts);
if let Some(window) = broker.int_latency {
set_value!("int_latency.min", window.min);
set_value!("int_latency.max", window.max);
set_value!("int_latency.avg", window.avg);
set_value!("int_latency.sum", window.sum);
set_value!("int_latency.cnt", window.cnt);
set_value!("int_latency.stddev", window.stddev);
set_value!("int_latency.hdrsize", window.hdrsize);
set_value!("int_latency.p50", window.p50);
set_value!("int_latency.p75", window.p75);
set_value!("int_latency.p90", window.p90);
set_value!("int_latency.p95", window.p95);
set_value!("int_latency.p99", window.p99);
set_value!("int_latency.p99_99", window.p99_99);
set_value!("int_latency.outofrange", window.outofrange);
}
if let Some(window) = broker.outbuf_latency {
set_value!("outbuf_latency.min", window.min);
set_value!("outbuf_latency.max", window.max);
set_value!("outbuf_latency.avg", window.avg);
set_value!("outbuf_latency.sum", window.sum);
set_value!("outbuf_latency.cnt", window.cnt);
set_value!("outbuf_latency.stddev", window.stddev);
set_value!("outbuf_latency.hdrsize", window.hdrsize);
set_value!("outbuf_latency.p50", window.p50);
set_value!("outbuf_latency.p75", window.p75);
set_value!("outbuf_latency.p90", window.p90);
set_value!("outbuf_latency.p95", window.p95);
set_value!("outbuf_latency.p99", window.p99);
set_value!("outbuf_latency.p99_99", window.p99_99);
set_value!("outbuf_latency.outofrange", window.outofrange);
}
}
}
}
impl ConsumerContext for StatsContext {}
impl StatsContext {
pub fn create_future_producer(config: &ClientConfig) -> KafkaResult<FutureProducer<Self>> {
FutureProducer::from_config_and_context(config, Self)
}
pub fn create_stream_consumer(config: &ClientConfig) -> KafkaResult<StreamConsumer<Self>> {
StreamConsumer::from_config_and_context(config, Self)
}
}
#[derive(Debug, Clone, Copy)]
pub enum GprcMessageKind {
Account,
Slot,
Transaction,
Block,
BlockMeta,
Entry,
Unknown,
}
impl From<&UpdateOneof> for GprcMessageKind {
fn from(msg: &UpdateOneof) -> Self {
match msg {
UpdateOneof::Account(_) => Self::Account,
UpdateOneof::Slot(_) => Self::Slot,
UpdateOneof::Transaction(_) => Self::Transaction,
UpdateOneof::Block(_) => Self::Block,
UpdateOneof::Ping(_) => unreachable!(),
UpdateOneof::BlockMeta(_) => Self::BlockMeta,
UpdateOneof::Entry(_) => Self::Entry,
}
}
}
impl GprcMessageKind {
const fn as_str(self) -> &'static str {
match self {
GprcMessageKind::Account => "account",
GprcMessageKind::Slot => "slot",
GprcMessageKind::Transaction => "transaction",
GprcMessageKind::Block => "block",
GprcMessageKind::BlockMeta => "blockmeta",
GprcMessageKind::Entry => "entry",
GprcMessageKind::Unknown => "unknown",
}
}
}
pub fn sent_inc(kind: GprcMessageKind) {
KAFKA_SENT_TOTAL.with_label_values(&[kind.as_str()]).inc()
}
pub fn recv_inc() {
KAFKA_RECV_TOTAL.inc();
}
pub fn dedup_inc() {
KAFKA_DEDUP_TOTAL.inc();
}
}