This commit is contained in:
GroovieGermanikus 2023-12-13 16:04:03 +01:00
parent 2247116fc9
commit dc8ff553b5
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
3 changed files with 28 additions and 61 deletions

View File

@ -1,27 +1,10 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::{Add, Deref, Sub};
use std::pin::{pin, Pin};
use anyhow::{bail, Context};
use async_stream::stream;
use futures::{Stream, StreamExt};
use itertools::{Itertools};
use log::{debug, error, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::{select};
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
use log::{info};
use tokio::sync::broadcast::{Receiver};
use tokio::time::{sleep, Duration};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeUpdateBlock};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{create_multiplex, GrpcSourceConfig};
fn start_example_consumer(blocks_notifier: Receiver<SubscribeUpdateBlock>) {
fn start_example_consumer(blocks_notifier: Receiver<Box<SubscribeUpdateBlock>>) {
tokio::spawn(async move {
let mut blocks_notifier = blocks_notifier;
loop {

View File

@ -3,28 +3,16 @@
// rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info,
// };
use anyhow::{bail, Context};
use futures::{Stream, StreamExt};
use itertools::Itertools;
use futures::{StreamExt};
use solana_sdk::{
borsh0_10::try_from_slice_unchecked,
commitment_config::CommitmentConfig,
compute_budget::{self, ComputeBudgetInstruction},
hash::Hash,
instruction::CompiledInstruction,
message::{
v0::{self, MessageAddressTableLookup},
MessageHeader, VersionedMessage,
},
pubkey::Pubkey,
signature::Signature,
transaction::TransactionError,
};
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap};
use tokio::sync::broadcast::Sender;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeUpdateBlock,
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdateBlock,
};
pub fn create_block_processing_task(
@ -44,7 +32,7 @@ pub fn create_block_processing_task(
},
);
let commitment_config = match commitment_level {
let _commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),

View File

@ -1,21 +1,17 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::{Add, Deref, Sub};
use std::pin::{pin, Pin};
use anyhow::{bail, Context};
use std::collections::{HashMap};
use std::ops::{Add};
use anyhow::{Context};
use async_stream::stream;
use futures::{Stream, StreamExt};
use itertools::{Itertools};
use log::{debug, error, info, warn};
use log::{debug, info, warn};
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::{select};
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
use tokio::sync::broadcast::{Sender};
use tokio::time::{Duration, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeUpdate, SubscribeUpdateBlock};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
@ -28,7 +24,7 @@ use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;
pub fn create_multiplex(
grpc_sources: Vec<GrpcSourceConfig>,
commitment_level: CommitmentLevel,
block_sx: Sender<SubscribeUpdateBlock>,
block_sx: Sender<Box<SubscribeUpdateBlock>>,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
assert!(
commitment_level == CommitmentLevel::Confirmed
@ -36,7 +32,7 @@ pub fn create_multiplex(
"Only CONFIRMED and FINALIZED is supported");
// note: PROCESSED blocks are not sequential in presense of forks; this will break the logic
if grpc_sources.len() < 1 {
if grpc_sources.is_empty() {
panic!("Must have at least one source");
}
@ -47,7 +43,9 @@ pub fn create_multiplex(
CommitmentLevel::Processed => CommitmentConfig::processed(),
};
let jh = tokio::spawn(async move {
tokio::spawn(async move {
info!("Starting multiplexer with {} sources: {}",
grpc_sources.len(),
grpc_sources.iter().map(|source| source.label.clone()).join(", "));
@ -61,7 +59,7 @@ pub fn create_multiplex(
let mut current_slot: Slot = 0;
let mut start_stream33 = false;
let _start_stream33 = false;
'main_loop: loop {
let block_cmd = select! {
@ -91,20 +89,18 @@ pub fn create_multiplex(
}
}
});
return jh;
})
}
#[derive(Debug)]
enum BlockCmd {
ForwardBlock(SubscribeUpdateBlock),
ForwardBlock(Box<SubscribeUpdateBlock>),
DiscardBlockBehindTip(Slot),
// skip geyser messages which are not block related updates
SkipMessage,
}
fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate, commitment_config: CommitmentConfig) -> BlockCmd {
fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate, _commitment_config: CommitmentConfig) -> BlockCmd {
if let Some(UpdateOneof::Block(update_block_message)) = update_message.update_oneof {
if update_block_message.slot <= current_slot && current_slot != 0 {
// no progress - skip this
@ -114,9 +110,9 @@ fn map_filter_block_message(current_slot: Slot, update_message: SubscribeUpdate,
// expensive
// let produced_block = map_produced_block(update_block_message, commitment_config);
BlockCmd::ForwardBlock(update_block_message)
BlockCmd::ForwardBlock(Box::new(update_block_message))
} else {
return BlockCmd::SkipMessage;
BlockCmd::SkipMessage
}
}