tools: add metrics, new config for google-pubsub (#280)

This commit is contained in:
Kirill Fomichev 2024-01-23 15:58:17 -05:00 committed by GitHub
parent 260bf36c1f
commit 5629485f87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 351 additions and 116 deletions

View File

@ -18,6 +18,8 @@ The minor version will be incremented upon a breaking change and the patch versi
### Breaking
- tools: add metrics, new config for google-pubsub ([#279](https://github.com/rpcpool/yellowstone-grpc/pull/279))
## 2024-01-15
- yellowstone-grpc-client-1.13.0+solana.1.17.16

View File

@ -12,8 +12,6 @@ Block reconstruction inside gRPC plugin based on information provided by BlockMe
### Validator
Current plugin version (`+solana.1.16.x`) use validator with backported `ReplicaBlockInfoV3` to Geyser interface — https://github.com/solana-labs/solana/pull/33359. As result it's not compatible with original validator from Solana Labs and would not work. You need to compile validator from the source code and can find patched releases in `Triton One` Solana fork: https://github.com/rpcpool/solana-public/tree/v1.16.16-geyser-block-v3.
```bash
$ solana-validator --geyser-plugin-config yellowstone-grpc-geyser/config.json
```

View File

@ -60,8 +60,8 @@ pub enum GeyserGrpcClientError {
pub type GeyserGrpcClientResult<T> = Result<T, GeyserGrpcClientError>;
pub struct GeyserGrpcClient<F> {
health: HealthClient<InterceptedService<Channel, F>>,
geyser: GeyserClient<InterceptedService<Channel, F>>,
pub health: HealthClient<InterceptedService<Channel, F>>,
pub geyser: GeyserClient<InterceptedService<Channel, F>>,
}
impl GeyserGrpcClient<()> {

View File

@ -1,7 +1,10 @@
{
"prometheus": "127.0.0.1:8873",
// "with_auth": true, // Auth with DefaultTokenSourceProvider
// "with_credentials": "path_to_json_file_with_creds", // or use envs: GOOGLE_APPLICATION_CREDENTIALS / GOOGLE_APPLICATION_CREDENTIALS_JSON
"client": {
// "with_auth": true, // Auth with DefaultTokenSourceProvider
// "with_credentials": "path_to_json_file_with_creds", // or use envs: GOOGLE_APPLICATION_CREDENTIALS / GOOGLE_APPLICATION_CREDENTIALS_JSON
"pool_size": 4
},
"grpc2pubsub": {
"endpoint": "http://127.0.0.1:10000",
"x_token": null,
@ -20,12 +23,22 @@
}
}
},
"max_message_size": "536_870_912", // 512MiB
"topic": "grpc",
"create_if_not_exists": true,
"workers": 3,
"flush_interval_ms": 100,
"bundle_size": 3,
"bulk_max_size": 10,
"bulk_max_wait_ms": 100
"publisher": {
"workers": 3,
"flush_interval_ms": 100,
"bundle_size": 3
},
"batch": {
"max_messages": 10,
"max_size_bytes": "9_500_000",
"max_wait_ms": 100,
"max_in_progress": 100
}
}
}

View File

@ -1,14 +1,18 @@
use {
anyhow::Context,
clap::{Parser, Subcommand},
futures::{future::BoxFuture, stream::StreamExt},
futures::{
future::{pending, BoxFuture, FutureExt},
stream::StreamExt,
},
google_cloud_googleapis::pubsub::v1::PubsubMessage,
google_cloud_pubsub::{client::Client, subscription::SubscriptionConfig},
std::{net::SocketAddr, time::Duration},
tokio::{task::JoinSet, time::sleep},
tracing::{info, warn},
tracing::{debug, error, info, warn},
yellowstone_grpc_client::GeyserGrpcClient,
yellowstone_grpc_proto::{
prelude::{subscribe_update::UpdateOneof, SubscribeUpdate},
prelude::{subscribe_update::UpdateOneof, CommitmentLevel, SubscribeUpdate},
prost::Message as _,
},
yellowstone_grpc_tools::{
@ -62,7 +66,7 @@ enum ArgsAction {
impl ArgsAction {
async fn run(self, config: Config) -> anyhow::Result<()> {
let shutdown = create_shutdown()?;
let client = config.create_client().await?;
let client = config.client.create_client().await?;
match self {
ArgsAction::Grpc2PubSub => {
@ -95,18 +99,25 @@ impl ArgsAction {
) -> anyhow::Result<()> {
// Connect to Pub/Sub and create topic if not exists
let topic = client.topic(&config.topic);
if !topic.exists(None).await? {
if !topic
.exists(None)
.await
.with_context(|| format!("failed to get topic: {}", config.topic))?
{
anyhow::ensure!(
config.create_if_not_exists,
"topic {} doesn't exists",
config.topic
);
topic.create(None, None).await?;
topic
.create(None, None)
.await
.with_context(|| format!("failed to create topic: {}", config.topic))?;
}
let publisher = topic.new_publisher(Some(config.get_publisher_config()));
let publisher = topic.new_publisher(Some(config.publisher.get_publisher_config()));
// Create gRPC client & subscribe
let mut client = GeyserGrpcClient::connect_with_timeout(
let client = GeyserGrpcClient::connect_with_timeout(
config.endpoint,
config.x_token,
None,
@ -115,71 +126,124 @@ impl ArgsAction {
false,
)
.await?;
let mut client = GeyserGrpcClient::new(
client.health,
client
.geyser
.max_decoding_message_size(config.max_message_size),
);
let mut geyser = client.subscribe_once2(config.request.to_proto()).await?;
// Receive-send loop
let mut send_tasks = JoinSet::new();
'outer: loop {
let sleep = sleep(Duration::from_millis(config.bulk_max_wait_ms as u64));
let mut prefetched_message: Option<(PubsubMessage, GprcMessageKind)> = None;
'receive_send_loop: loop {
let sleep = sleep(config.batch.max_wait);
tokio::pin!(sleep);
let mut messages_size = 0;
let mut messages = vec![];
let mut prom_kind = vec![];
while messages.len() < config.bulk_max_size {
let message = tokio::select! {
_ = &mut shutdown => break 'outer,
let mut prom_kinds = vec![];
loop {
if let Some((message, prom_kind)) = prefetched_message.take() {
if messages.len() < config.batch.max_messages
&& messages_size + message.data.len() <= config.batch.max_size_bytes
{
messages_size += message.data.len();
messages.push(message);
prom_kinds.push(prom_kind);
} else if message.data.len() > config.batch.max_size_bytes {
prom::drop_oversized_inc(prom_kind);
debug!("drop {prom_kind:?} message, size: {}", message.data.len());
} else {
prefetched_message = Some((message, prom_kind));
break;
}
}
let send_task_fut = if send_tasks.is_empty() {
pending().boxed()
} else {
send_tasks.join_next().boxed()
};
tokio::select! {
_ = &mut shutdown => break 'receive_send_loop,
_ = &mut sleep => break,
maybe_result = send_tasks.join_next() => match maybe_result {
maybe_result = send_task_fut => match maybe_result {
Some(result) => {
result??;
prom::send_batches_dec();
result?;
continue;
}
None => tokio::select! {
_ = &mut shutdown => break 'outer,
_ = &mut sleep => break,
message = geyser.next() => message,
}
None => unreachable!()
},
message = geyser.next() => message,
}
.transpose()?;
let message = match message {
Some(message) => message,
None => break 'outer,
};
message = geyser.next() => {
let message = message
.ok_or_else(|| anyhow::anyhow!("gRPC stream finished"))?
.context("failed to get message from gRPC")?;
match &message.update_oneof {
Some(UpdateOneof::Ping(_)) => continue,
Some(UpdateOneof::Pong(_)) => continue,
Some(value) => prom_kind.push(GprcMessageKind::from(value)),
None => unreachable!("Expect valid message"),
};
match &message {
SubscribeUpdate { filters: _, update_oneof: Some(UpdateOneof::Ping(_)) } => prom::recv_inc(GprcMessageKind::Ping),
SubscribeUpdate { filters: _, update_oneof: Some(UpdateOneof::Pong(_)) } => prom::recv_inc(GprcMessageKind::Pong),
SubscribeUpdate { filters: _, update_oneof: Some(value) } => {
if let UpdateOneof::Slot(slot) = value {
prom::set_slot_tip(
CommitmentLevel::try_from(slot.status).expect("valid commitment"),
slot.slot.try_into().expect("valid i64 slot"),
);
}
messages.push(PubsubMessage {
data: message.encode_to_vec(),
..Default::default()
});
let message = PubsubMessage {
data: message.encode_to_vec(),
..Default::default()
};
let prom_kind = GprcMessageKind::from(value);
prefetched_message = Some((message, prom_kind));
prom::recv_inc(prom_kind);
},
SubscribeUpdate { filters: _, update_oneof: None } => anyhow::bail!("received empty updat emessage"),
};
}
};
}
if messages.is_empty() {
continue;
}
for (awaiter, prom_kind) in publisher
.publish_bulk(messages)
.await
.into_iter()
.zip(prom_kind.into_iter())
{
send_tasks.spawn(async move {
awaiter.get().await?;
prom::sent_inc(prom_kind);
Ok::<(), anyhow::Error>(())
});
while send_tasks.len() >= config.batch.max_in_progress {
if let Some(result) = send_tasks.join_next().await {
prom::send_batches_dec();
result?;
}
}
let awaiters = publisher.publish_bulk(messages).await;
for prom_kind in prom_kinds.iter().copied() {
prom::send_awaiters_inc(prom_kind);
}
send_tasks.spawn(async move {
for (awaiter, prom_kind) in awaiters.into_iter().zip(prom_kinds.into_iter()) {
let status = if let Err(error) = awaiter.get().await {
error!("failed to send message {prom_kind:?}, error: {error:?}");
Err(())
} else {
Ok(())
};
prom::sent_inc(prom_kind, status);
prom::send_awaiters_dec(prom_kind);
}
});
prom::send_batches_inc();
}
warn!("shutdown received...");
while let Some(result) = send_tasks.join_next().await {
result??;
result?;
}
Ok(())
}
@ -241,11 +305,14 @@ async fn main() -> anyhow::Result<()> {
// Parse args
let args = Args::parse();
let config = config_load::<Config>(&args.config).await?;
let config = config_load::<Config>(&args.config)
.await
.with_context(|| format!("failed to load config from file: {}", args.config))?;
// Run prometheus server
if let Some(address) = args.prometheus.or(config.prometheus) {
prometheus_run_server(address)?;
prometheus_run_server(address)
.with_context(|| format!("failed to run server at: {:?}", address))?;
}
args.action.run(config).await

View File

@ -4,6 +4,7 @@ use {
std::{
collections::{HashMap, HashSet},
path::Path,
time::Duration,
},
tokio::fs,
yellowstone_grpc_proto::prelude::{
@ -237,6 +238,13 @@ where
.map_err(de::Error::custom),
}
}
pub fn deserialize_duration_ms_str<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: de::Deserializer<'de>,
{
let ms = deserialize_usize_str(deserializer)?;
Ok(Duration::from_millis(ms as u64))
}
#[cfg(test)]
mod tests {

View File

@ -1,7 +1,10 @@
use {
crate::config::{deserialize_usize_str, ConfigGrpcRequest},
crate::config::{deserialize_duration_ms_str, deserialize_usize_str, ConfigGrpcRequest},
google_cloud_pubsub::{
client::{google_cloud_auth::credentials::CredentialsFile, Client, ClientConfig},
client::{
google_cloud_auth::credentials::CredentialsFile, Client,
ClientConfig as PubsubClientConfig,
},
publisher::PublisherConfig,
},
serde::Deserialize,
@ -12,70 +15,100 @@ use {
#[serde(default)]
pub struct Config {
pub prometheus: Option<SocketAddr>,
pub with_auth: Option<bool>,
pub with_credentials: Option<String>,
pub client: ConfigClient,
pub grpc2pubsub: Option<ConfigGrpc2PubSub>,
}
impl Config {
#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct ConfigClient {
pub with_auth: Option<bool>,
pub with_credentials: Option<String>,
pub pool_size: Option<usize>,
}
impl Default for ConfigClient {
fn default() -> Self {
Self {
with_auth: None,
with_credentials: None,
pool_size: PubsubClientConfig::default().pool_size,
}
}
}
impl ConfigClient {
pub async fn create_client(&self) -> anyhow::Result<Client> {
let mut config = ClientConfig::default();
if matches!(self.with_auth, Some(true)) {
config = config.with_auth().await?;
} else if let Some(creds) = match self.with_credentials.clone() {
Some(filepath) => CredentialsFile::new_from_file(filepath).await.map(Some),
None => {
let mut config = PubsubClientConfig::default();
match (self.with_auth, self.with_credentials.as_ref()) {
(Some(true), Some(_creds)) => {
anyhow::bail!("Only `with_auth` or `with_credentials` can be enabled")
}
(Some(true), _) => {
config = config.with_auth().await?;
}
(_, Some(filepath)) => {
let creds = CredentialsFile::new_from_file(filepath.clone()).await?;
config = config.with_credentials(creds).await?;
}
(_, None) => {
if std::env::var("GOOGLE_APPLICATION_CREDENTIALS_JSON").is_ok()
|| std::env::var("GOOGLE_APPLICATION_CREDENTIALS").is_ok()
{
CredentialsFile::new().await.map(Some)
} else {
Ok(None)
let creds = CredentialsFile::new().await?;
config = config.with_credentials(creds).await?;
}
}
}? {
config = config.with_credentials(creds).await?;
}
Client::new(config).await.map_err(Into::into)
}
}
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
#[derive(Debug, Deserialize)]
pub struct ConfigGrpc2PubSub {
pub endpoint: String,
pub x_token: Option<String>,
pub request: ConfigGrpcRequest,
#[serde(
default = "ConfigGrpc2PubSub::default_max_message_size",
deserialize_with = "deserialize_usize_str"
)]
pub max_message_size: usize,
pub topic: String,
// Create `topic` with default config if not exists
#[serde(default)]
pub create_if_not_exists: bool,
// Publisher config
#[serde(default = "ConfigGrpc2PubSub::default_workers")]
pub workers: usize,
#[serde(
default = "ConfigGrpc2PubSub::default_flush_interval_ms",
deserialize_with = "deserialize_usize_str"
)]
pub flush_interval_ms: usize,
#[serde(default = "ConfigGrpc2PubSub::default_bundle_size")]
pub bundle_size: usize,
pub publisher: ConfigGrpc2PubSubPublisher,
// Publisher bulk config
#[serde(
default = "ConfigGrpc2PubSub::default_bulk_max_size",
deserialize_with = "deserialize_usize_str"
)]
pub bulk_max_size: usize,
#[serde(
default = "ConfigGrpc2PubSub::default_bulk_max_wait_ms",
deserialize_with = "deserialize_usize_str"
)]
pub bulk_max_wait_ms: usize,
// Publisher bulk/batch config
pub batch: ConfigGrpc2PubSubBatch,
}
impl ConfigGrpc2PubSub {
const fn default_max_message_size() -> usize {
512 * 1024 * 1024
}
}
#[derive(Debug, Deserialize)]
pub struct ConfigGrpc2PubSubPublisher {
#[serde(default = "ConfigGrpc2PubSubPublisher::default_workers")]
pub workers: usize,
#[serde(
default = "ConfigGrpc2PubSubPublisher::default_flush_interval_ms",
deserialize_with = "deserialize_usize_str"
)]
pub flush_interval_ms: usize,
#[serde(default = "ConfigGrpc2PubSubPublisher::default_bundle_size")]
pub bundle_size: usize,
}
impl ConfigGrpc2PubSubPublisher {
fn default_workers() -> usize {
PublisherConfig::default().workers
}
@ -88,14 +121,6 @@ impl ConfigGrpc2PubSub {
PublisherConfig::default().bundle_size
}
const fn default_bulk_max_size() -> usize {
10
}
const fn default_bulk_max_wait_ms() -> usize {
100
}
pub const fn get_publisher_config(&self) -> PublisherConfig {
PublisherConfig {
workers: self.workers,
@ -105,3 +130,33 @@ impl ConfigGrpc2PubSub {
}
}
}
#[derive(Debug, Deserialize)]
#[serde(default)]
pub struct ConfigGrpc2PubSubBatch {
#[serde(deserialize_with = "deserialize_usize_str")]
pub max_messages: usize,
#[serde(deserialize_with = "deserialize_usize_str")]
pub max_size_bytes: usize,
#[serde(
deserialize_with = "deserialize_duration_ms_str",
rename = "max_wait_ms"
)]
pub max_wait: Duration,
#[serde(deserialize_with = "deserialize_usize_str")]
pub max_in_progress: usize,
}
impl Default for ConfigGrpc2PubSubBatch {
fn default() -> Self {
Self {
max_messages: 10,
max_size_bytes: 9_500_000,
max_wait: Duration::from_millis(100),
max_in_progress: 100,
}
}
}

View File

@ -1,17 +1,96 @@
use {
crate::prom::GprcMessageKind,
prometheus::{IntCounterVec, Opts},
prometheus::{Gauge, IntCounterVec, IntGaugeVec, Opts},
yellowstone_grpc_proto::prelude::CommitmentLevel,
};
lazy_static::lazy_static! {
pub(crate) static ref GOOGLE_PUBSUB_SENT_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("google_pubsub_sent_total", "Total number of uploaded messages by type"),
pub(crate) static ref GOOGLE_PUBSUB_RECV_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("google_pubsub_recv_total", "Total number of received messages from gRPC by type"),
&["kind"]
).unwrap();
pub(crate) static ref GOOGLE_PUBSUB_SENT_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("google_pubsub_sent_total", "Total number of uploaded messages to pubsub by type"),
&["kind", "status"]
).unwrap();
pub(crate) static ref GOOGLE_PUBSUB_SEND_BATCHES_IN_PROGRESS: Gauge = Gauge::new(
"google_pubsub_send_batches_in_progress", "Number of batches in progress"
).unwrap();
pub(crate) static ref GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS: IntGaugeVec = IntGaugeVec::new(
Opts::new("google_pubsub_awaiters_in_progress", "Number of awaiters in progress by type"),
&["kind"]
).unwrap();
pub(crate) static ref GOOGLE_PUBSUB_DROP_OVERSIZED_TOTAL: IntCounterVec = IntCounterVec::new(
Opts::new("google_pubsub_drop_oversized_total", "Total number of dropped oversized messages"),
&["kind"]
).unwrap();
pub(crate) static ref GOOGLE_PUBSUB_SLOT_TIP: IntGaugeVec = IntGaugeVec::new(
Opts::new("google_pubsub_slot_tip", "Latest received slot from gRPC by commitment"),
&["commitment"]
).unwrap();
}
pub fn sent_inc(kind: GprcMessageKind) {
pub fn recv_inc(kind: GprcMessageKind) {
GOOGLE_PUBSUB_RECV_TOTAL
.with_label_values(&[kind.as_str()])
.inc();
GOOGLE_PUBSUB_RECV_TOTAL.with_label_values(&["total"]).inc()
}
pub fn sent_inc(kind: GprcMessageKind, status: Result<(), ()>) {
let status = if status.is_ok() { "success" } else { "failed" };
GOOGLE_PUBSUB_SENT_TOTAL
.with_label_values(&[kind.as_str(), status])
.inc();
GOOGLE_PUBSUB_SENT_TOTAL
.with_label_values(&["total", status])
.inc()
}
pub fn send_batches_inc() {
GOOGLE_PUBSUB_SEND_BATCHES_IN_PROGRESS.inc()
}
pub fn send_batches_dec() {
GOOGLE_PUBSUB_SEND_BATCHES_IN_PROGRESS.dec()
}
pub fn send_awaiters_inc(kind: GprcMessageKind) {
GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS
.with_label_values(&[kind.as_str()])
.inc();
GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS
.with_label_values(&["total"])
.inc()
}
pub fn send_awaiters_dec(kind: GprcMessageKind) {
GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS
.with_label_values(&[kind.as_str()])
.dec();
GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS
.with_label_values(&["total"])
.dec()
}
pub fn drop_oversized_inc(kind: GprcMessageKind) {
GOOGLE_PUBSUB_DROP_OVERSIZED_TOTAL
.with_label_values(&[kind.as_str()])
.inc()
}
pub fn set_slot_tip(commitment: CommitmentLevel, slot: i64) {
GOOGLE_PUBSUB_SLOT_TIP
.with_label_values(&[match commitment {
CommitmentLevel::Processed => "processed",
CommitmentLevel::Confirmed => "confirmed",
CommitmentLevel::Finalized => "finalized",
}])
.set(slot)
}

View File

@ -1,5 +1,9 @@
#[cfg(feature = "google-pubsub")]
use crate::google_pubsub::prom::GOOGLE_PUBSUB_SENT_TOTAL;
use crate::google_pubsub::prom::{
GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS, GOOGLE_PUBSUB_DROP_OVERSIZED_TOTAL,
GOOGLE_PUBSUB_RECV_TOTAL, GOOGLE_PUBSUB_SEND_BATCHES_IN_PROGRESS, GOOGLE_PUBSUB_SENT_TOTAL,
GOOGLE_PUBSUB_SLOT_TIP,
};
#[cfg(feature = "kafka")]
use crate::kafka::prom::{KAFKA_DEDUP_TOTAL, KAFKA_RECV_TOTAL, KAFKA_SENT_TOTAL, KAFKA_STATS};
use {
@ -37,7 +41,12 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(VERSION);
#[cfg(feature = "google-pubsub")]
{
register!(GOOGLE_PUBSUB_RECV_TOTAL);
register!(GOOGLE_PUBSUB_SENT_TOTAL);
register!(GOOGLE_PUBSUB_SEND_BATCHES_IN_PROGRESS);
register!(GOOGLE_PUBSUB_AWAITERS_IN_PROGRESS);
register!(GOOGLE_PUBSUB_DROP_OVERSIZED_TOTAL);
register!(GOOGLE_PUBSUB_SLOT_TIP);
}
#[cfg(feature = "kafka")]
{
@ -103,6 +112,8 @@ pub enum GprcMessageKind {
Slot,
Transaction,
Block,
Ping,
Pong,
BlockMeta,
Entry,
Unknown,
@ -115,8 +126,8 @@ impl From<&UpdateOneof> for GprcMessageKind {
UpdateOneof::Slot(_) => Self::Slot,
UpdateOneof::Transaction(_) => Self::Transaction,
UpdateOneof::Block(_) => Self::Block,
UpdateOneof::Ping(_) => unreachable!(),
UpdateOneof::Pong(_) => unreachable!(),
UpdateOneof::Ping(_) => Self::Ping,
UpdateOneof::Pong(_) => Self::Pong,
UpdateOneof::BlockMeta(_) => Self::BlockMeta,
UpdateOneof::Entry(_) => Self::Entry,
}
@ -130,6 +141,8 @@ impl GprcMessageKind {
GprcMessageKind::Slot => "slot",
GprcMessageKind::Transaction => "transaction",
GprcMessageKind::Block => "block",
GprcMessageKind::Ping => "ping",
GprcMessageKind::Pong => "pong",
GprcMessageKind::BlockMeta => "blockmeta",
GprcMessageKind::Entry => "entry",
GprcMessageKind::Unknown => "unknown",