clippy+fmt

This commit is contained in:
GroovieGermanikus 2024-01-26 19:25:39 +01:00
parent db034040e8
commit 68221ce0cc
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
9 changed files with 37 additions and 55 deletions

View File

@ -5,16 +5,9 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use geyser_grpc_connector::channel_plugger::{
spawn_broadcast_channel_plug, spawn_plugger_mpcs_to_broadcast,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::{
create_geyser_autoconnection_task,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::channel_plugger::spawn_broadcast_channel_plug;
use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task;
use geyser_grpc_connector::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
use geyser_grpc_connector::{GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use tokio::time::{sleep, Duration};
use tracing::warn;
@ -22,20 +15,6 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use yellowstone_grpc_proto::prost::Message as _;
fn start_example_blockmini_consumer(
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
) {
tokio::spawn(async move {
let mut blockmeta_stream = pin!(multiplex_stream);
while let Some(mini) = blockmeta_stream.next().await {
info!(
"emitted block mini #{}@{} with {} bytes from multiplexer",
mini.slot, mini.commitment_config.commitment, mini.blocksize
);
}
});
}
pub struct BlockMini {
pub blocksize: usize,
pub slot: Slot,
@ -73,7 +52,7 @@ impl FromYellowstoneExtractor for BlockMiniExtractor {
}
}
#[warn(dead_code)]
#[allow(dead_code)]
enum TestCases {
Basic,
SlowReceiverStartup,
@ -102,6 +81,7 @@ pub async fn main() {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =

View File

@ -129,6 +129,7 @@ pub async fn main() {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let green_config =

View File

@ -5,9 +5,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::env;
use std::pin::pin;
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::{
create_geyser_reconnecting_stream,
};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
@ -88,10 +86,10 @@ pub async fn main() {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
receive_timeout: Duration::from_secs(5),
};
let config =
GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
let config = GrpcSourceConfig::new(grpc_addr_green, grpc_x_token_green, None, timeouts.clone());
info!("Write Block stream..");
@ -156,7 +154,6 @@ pub async fn main() {
sleep(Duration::from_secs(1800)).await;
}
fn map_block_update(update: SubscribeUpdate) -> Option<Slot> {
match update.update_oneof {
Some(UpdateOneof::Block(update_block_message)) => {

View File

@ -2,8 +2,6 @@ use log::{debug, info, warn};
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::time::{sleep, timeout};
use crate::grpcmultiplex_fastestwins::FromYellowstoneExtractor;
/// usage: see plug_pattern test
pub fn spawn_broadcast_channel_plug<T: Send + 'static>(
@ -43,11 +41,12 @@ pub fn spawn_plugger_mpcs_to_broadcast<T: Send + 'static>(
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{sleep, timeout};
#[tokio::test]
async fn plug_pattern() {
let (jh_task, message_channel) = tokio::sync::mpsc::channel::<u32>(1);
let broadcast_rx =
let (_jh_task, message_channel) = tokio::sync::mpsc::channel::<u32>(1);
let _broadcast_rx =
spawn_broadcast_channel_plug(tokio::sync::broadcast::channel(8), message_channel);
}

View File

@ -1,15 +1,10 @@
// use crate::{
// endpoint_stremers::EndpointStreaming,
// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info,
// };
use anyhow::{bail, Context};
use futures::{Stream, StreamExt};
use futures::StreamExt;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::Sender;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::SubscribeRequest;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeUpdateBlock,

View File

@ -1,14 +1,12 @@
use crate::{Attempt, GrpcSourceConfig, Message};
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::fmt::{Debug, Display};
use log::{debug, info, log, trace, warn, Level};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientResult};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
use yellowstone_grpc_proto::tonic::Status;
enum ConnectionState<S: Stream<Item = Result<SubscribeUpdate, Status>>> {
@ -143,6 +141,7 @@ mod tests {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
@ -164,6 +163,7 @@ mod tests {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(

View File

@ -1,14 +1,11 @@
use crate::{GrpcSourceConfig, Message};
use futures::{Stream, StreamExt};
use log::{debug, error, info, log, trace, warn, Level};
use std::fmt::{Debug, Display};
use std::future::Future;
use std::time::Duration;
use tokio::sync::mpsc::error::SendTimeoutError;
use tokio::sync::mpsc::Receiver;
use tokio::task::AbortHandle;
use tokio::time::{sleep, timeout, Instant};
use tokio::time::error::Elapsed;
use yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError};
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeUpdate};
use yellowstone_grpc_proto::tonic::service::Interceptor;
@ -214,7 +211,12 @@ pub fn create_geyser_autoconnection_task(
ConnectionState::Ready(attempt, mut geyser_stream) => {
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
'recv_loop: loop {
match timeout(receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next()).await {
match timeout(
receive_timeout.unwrap_or(Duration::MAX),
geyser_stream.next(),
)
.await
{
Ok(Some(Ok(update_message))) => {
trace!("> recv update message from {}", grpc_source);
// note: first send never blocks as the mpsc channel has capacity 1
@ -309,6 +311,7 @@ mod tests {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(
@ -330,6 +333,7 @@ mod tests {
connect_timeout: Duration::from_secs(1),
request_timeout: Duration::from_secs(2),
subscribe_timeout: Duration::from_secs(3),
receive_timeout: Duration::from_secs(3),
};
assert_eq!(
format!(

View File

@ -1,11 +1,11 @@
use crate::Message;
use crate::Message::GeyserSubscribeUpdate;
use async_stream::stream;
use futures::{Stream, StreamExt};
use log::{info, warn};
use merge_streams::MergeStreams;
use solana_sdk::clock::Slot;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
use crate::Message::GeyserSubscribeUpdate;
pub trait FromYellowstoneExtractor {
// Target is something like ProducedBlock

View File

@ -2,7 +2,10 @@ use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::time::Duration;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots, SubscribeUpdate,
};
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub mod channel_plugger;
@ -28,6 +31,7 @@ pub struct GrpcConnectionTimeouts {
pub connect_timeout: Duration,
pub request_timeout: Duration,
pub subscribe_timeout: Duration,
pub receive_timeout: Duration,
}
#[derive(Clone)]
@ -127,10 +131,12 @@ impl GeyserFilter {
pub fn slots(&self) -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert("client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
});
slots_subs.insert(
"client".to_string(),
SubscribeRequestFilterSlots {
filter_by_commitment: Some(true),
},
);
SubscribeRequest {
slots: slots_subs,