kafka: support prometheus address in config (#198)

This commit is contained in:
Kirill Fomichev 2023-10-09 18:13:37 +04:00 committed by GitHub
parent 29b8cd57a3
commit 11dbee965d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 38 additions and 29 deletions

View File

@ -10,15 +10,23 @@ The minor version will be incremented upon a breaking change and the patch versi
## [Unreleased]
### Features
### Fixes
### Features
- client: add `GeyserGrpcClient::subscribe_once2` ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).
### Breaking
## 2023-10-09
- yellowstone-grpc-kafka-1.0.0-rc.3+solana.1.16.15
### Features
- kafka: add metrics (stats, sent, recv) ([#196](https://github.com/rpcpool/yellowstone-grpc/pull/196)).
- kafka: support YAML config ([#197](https://github.com/rpcpool/yellowstone-grpc/pull/197)).
### Fixes
- kafka: support prometheus address in config ([#198](https://github.com/rpcpool/yellowstone-grpc/pull/198)).
## 2023-10-06
@ -28,8 +36,6 @@ The minor version will be incremented upon a breaking change and the patch versi
- kafka: fix message size for gRPC client ([#195](https://github.com/rpcpool/yellowstone-grpc/pull/195)).
### Breaking
## 2023-10-05
- yellowstone-grpc-client-1.11.0+solana.1.16.15

2
Cargo.lock generated
View File

@ -4720,7 +4720,7 @@ dependencies = [
[[package]]
name = "yellowstone-grpc-kafka"
version = "1.0.0-rc.2+solana.1.16.15"
version = "1.0.0-rc.3+solana.1.16.15"
dependencies = [
"anyhow",
"async-trait",

View File

@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-kafka"
version = "1.0.0-rc.2+solana.1.16.15"
version = "1.0.0-rc.3+solana.1.16.15"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Kafka Producer/Dedup/Consumer"

View File

@ -1,6 +1,8 @@
{
"prometheus": "127.0.0.1:8873",
"kafka": {
"bootstrap.servers": "localhost:29092"
"bootstrap.servers": "localhost:29092",
"statistics.interval.ms": "1000"
},
"dedup": {
"kafka": {

View File

@ -38,7 +38,7 @@ struct Args {
#[clap(short, long)]
config: String,
/// Prometheus listen address
/// [DEPRECATED: use config] Prometheus listen address
#[clap(long)]
prometheus: Option<SocketAddr>,
@ -358,7 +358,9 @@ async fn main() -> anyhow::Result<()> {
let config = Config::load(&args.config).await?;
// Run prometheus server
prom::run_server(args.prometheus)?;
if let Some(address) = config.prometheus.or(args.prometheus) {
prom::run_server(address)?;
}
// Create kafka config
let mut kafka_config = ClientConfig::new();

View File

@ -28,6 +28,7 @@ pub trait GrpcRequestToProto<T> {
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
pub struct Config {
pub prometheus: Option<SocketAddr>,
pub kafka: HashMap<String, String>,
pub dedup: Option<ConfigDedup>,
pub grpc2kafka: Option<ConfigGrpc2Kafka>,

View File

@ -37,7 +37,7 @@ lazy_static::lazy_static! {
).unwrap();
}
pub fn run_server(address: Option<SocketAddr>) -> anyhow::Result<()> {
pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
static REGISTER: Once = Once::new();
REGISTER.call_once(|| {
macro_rules! register {
@ -66,24 +66,22 @@ pub fn run_server(address: Option<SocketAddr>) -> anyhow::Result<()> {
.inc();
});
if let Some(address) = address {
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: {address:?}");
tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});
}
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
});
let server = Server::try_bind(&address)?.serve(make_service);
info!("prometheus server started: {address:?}");
tokio::spawn(async move {
if let Err(error) = server.await {
error!("prometheus server failed: {error:?}");
}
});
Ok(())
}