Add transactions to block message, new block meta message (#27)

This commit is contained in:
Kirill Fomichev 2023-01-12 11:37:58 -03:00 committed by GitHub
parent 7295fb324a
commit 1d0d4dcbf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 188 additions and 26 deletions

2
Cargo.lock generated
View File

@ -2554,7 +2554,7 @@ dependencies = [
[[package]]
name = "solana-geyser-grpc"
version = "0.2.0"
version = "0.3.0+solana.1.14.10"
dependencies = [
"anyhow",
"bincode",

View File

@ -1,6 +1,6 @@
[package]
name = "solana-geyser-grpc"
version = "0.2.0"
version = "0.3.0+solana.1.14.10"
authors = ["Triton One"]
edition = "2021"

View File

@ -57,6 +57,10 @@ If all fields are empty then all transactions are broadcasted. Otherwise fields
Currently all blocks are broadcasted.
#### Blocks meta
Same as `Blocks` but without `transactions`.
### Limit filters
It's possible to add limits for filters in config. If `filters` field is omitted then filters doesn't have any limits.

View File

@ -13,6 +13,7 @@ message SubscribeRequest {
map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterBlocks> blocks = 4;
map<string, SubscribeRequestFilterBlocksMeta> blocks_meta = 5;
}
message SubscribeRequestFilterAccounts {
@ -31,6 +32,8 @@ message SubscribeRequestFilterTransactions {
message SubscribeRequestFilterBlocks {}
message SubscribeRequestFilterBlocksMeta {}
message SubscribeUpdate {
repeated string filters = 1;
oneof update_oneof {
@ -39,6 +42,7 @@ message SubscribeUpdate {
SubscribeUpdateTransaction transaction = 4;
SubscribeUpdateBlock block = 5;
SubscribeUpdatePing ping = 6;
SubscribeUpdateBlockMeta block_meta = 7;
}
}
@ -90,6 +94,15 @@ message SubscribeUpdateBlock {
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
repeated SubscribeUpdateTransactionInfo transactions = 6;
}
message SubscribeUpdateBlockMeta {
uint64 slot = 1;
string blockhash = 2;
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
}
message SubscribeUpdatePing {}

View File

@ -3,8 +3,8 @@ use {
futures::stream::{once, StreamExt},
solana_geyser_grpc::proto::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
},
std::collections::HashMap,
tonic::transport::{channel::ClientTlsConfig, Endpoint, Uri},
@ -56,6 +56,10 @@ struct Args {
#[clap(long)]
/// Subscribe on block updates
blocks: bool,
#[clap(long)]
/// Subscribe on block meta updates (without transactions)
blocks_meta: bool,
}
#[tokio::main]
@ -96,6 +100,11 @@ async fn main() -> anyhow::Result<()> {
blocks.insert("client".to_owned(), SubscribeRequestFilterBlocks {});
}
let mut blocks_meta = HashMap::new();
if args.blocks_meta {
blocks_meta.insert("client".to_owned(), SubscribeRequestFilterBlocksMeta {});
}
let mut endpoint = Endpoint::from_shared(args.endpoint.clone())?;
let uri: Uri = args.endpoint.parse()?;
if uri.scheme_str() == Some("https") {
@ -108,6 +117,7 @@ async fn main() -> anyhow::Result<()> {
accounts,
transactions,
blocks,
blocks_meta,
};
println!("Going to send request: {:?}", request);

View File

@ -82,6 +82,7 @@ pub struct ConfigGrpcFilters {
pub slots: ConfigGrpcFiltersSlots,
pub transactions: ConfigGrpcFiltersTransactions,
pub blocks: ConfigGrpcFiltersBlocks,
pub blocks_meta: ConfigGrpcFiltersBlocksMeta,
}
impl ConfigGrpcFilters {
@ -157,6 +158,12 @@ pub struct ConfigGrpcFiltersBlocks {
pub max: usize,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersBlocksMeta {
pub max: usize,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigPrometheus {

View File

@ -2,12 +2,16 @@ use {
crate::{
config::{
ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks,
ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions,
ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions,
},
grpc::{
Message, MessageAccount, MessageBlock, MessageBlockMeta, MessageSlot,
MessageTransaction,
},
grpc::{Message, MessageAccount, MessageBlock, MessageSlot, MessageTransaction},
proto::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
},
},
solana_sdk::pubkey::Pubkey,
@ -25,6 +29,7 @@ pub struct Filter {
slots: FilterSlots,
transactions: FilterTransactions,
blocks: FilterBlocks,
blocks_meta: FilterBlocksMeta,
}
impl Filter {
@ -40,6 +45,7 @@ impl Filter {
limit.map(|v| &v.transactions),
)?,
blocks: FilterBlocks::new(&config.blocks, limit.map(|v| &v.blocks))?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, limit.map(|v| &v.blocks_meta))?,
})
}
@ -67,6 +73,7 @@ impl Filter {
Message::Slot(message) => self.slots.get_filters(message),
Message::Transaction(message) => self.transactions.get_filters(message),
Message::Block(message) => self.blocks.get_filters(message),
Message::BlockMeta(message) => self.blocks_meta.get_filters(message),
}
}
}
@ -223,7 +230,7 @@ impl FilterSlots {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
Ok(FilterSlots {
Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
@ -356,7 +363,7 @@ impl FilterBlocks {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
Ok(FilterBlocks {
Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
@ -369,3 +376,31 @@ impl FilterBlocks {
self.filters.clone()
}
}
#[derive(Debug, Default)]
struct FilterBlocksMeta {
filters: Vec<String>,
}
impl FilterBlocksMeta {
fn new(
configs: &HashMap<String, SubscribeRequestFilterBlocksMeta>,
limit: Option<&ConfigGrpcFiltersBlocksMeta>,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
Ok(Self {
filters: configs
.iter()
// .filter_map(|(name, _filter)| Some(name.clone()))
.map(|(name, _filter)| name.clone())
.collect(),
})
}
fn get_filters(&self, _message: &MessageBlockMeta) -> Vec<String> {
self.filters.clone()
}
}

View File

@ -7,8 +7,9 @@ use {
geyser_server::{Geyser, GeyserServer},
subscribe_update::UpdateOneof,
SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
SubscribeUpdateBlock, SubscribeUpdatePing, SubscribeUpdateSlot,
SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo,
SubscribeUpdateBlock, SubscribeUpdateBlockMeta, SubscribeUpdatePing,
SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction,
SubscribeUpdateTransactionInfo,
},
},
log::*,
@ -102,7 +103,7 @@ impl From<(u64, Option<u64>, SlotStatus)> for MessageSlot {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MessageTransactionInfo {
pub signature: Signature,
pub is_vote: bool,
@ -111,6 +112,18 @@ pub struct MessageTransactionInfo {
pub index: usize,
}
impl From<&MessageTransactionInfo> for SubscribeUpdateTransactionInfo {
fn from(tx: &MessageTransactionInfo) -> Self {
Self {
signature: tx.signature.as_ref().into(),
is_vote: tx.is_vote,
transaction: Some((&tx.transaction).into()),
meta: Some((&tx.meta).into()),
index: tx.index as u64,
}
}
}
#[derive(Debug)]
pub struct MessageTransaction {
pub transaction: MessageTransactionInfo,
@ -144,10 +157,45 @@ pub struct MessageBlock {
pub rewards: Vec<Reward>,
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
pub transactions: Vec<MessageTransactionInfo>,
}
impl<'a> From<ReplicaBlockInfoVersions<'a>> for MessageBlock {
fn from(blockinfo: ReplicaBlockInfoVersions<'a>) -> Self {
impl<'a>
From<(
&'a ReplicaBlockInfoVersions<'a>,
Vec<MessageTransactionInfo>,
)> for MessageBlock
{
fn from(
(blockinfo, transactions): (
&'a ReplicaBlockInfoVersions<'a>,
Vec<MessageTransactionInfo>,
),
) -> Self {
match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(info) => Self {
slot: info.slot,
blockhash: info.blockhash.to_string(),
rewards: info.rewards.into(),
block_time: info.block_time,
block_height: info.block_height,
transactions,
},
}
}
}
#[derive(Debug)]
pub struct MessageBlockMeta {
pub slot: u64,
pub blockhash: String,
pub rewards: Vec<Reward>,
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
}
impl<'a> From<&'a ReplicaBlockInfoVersions<'a>> for MessageBlockMeta {
fn from(blockinfo: &'a ReplicaBlockInfoVersions<'a>) -> Self {
match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(info) => Self {
slot: info.slot,
@ -166,6 +214,7 @@ pub enum Message {
Account(MessageAccount),
Transaction(MessageTransaction),
Block(MessageBlock),
BlockMeta(MessageBlockMeta),
}
impl From<&Message> for UpdateOneof {
@ -191,13 +240,7 @@ impl From<&Message> for UpdateOneof {
is_startup: message.is_startup,
}),
Message::Transaction(message) => UpdateOneof::Transaction(SubscribeUpdateTransaction {
transaction: Some(SubscribeUpdateTransactionInfo {
signature: message.transaction.signature.as_ref().into(),
is_vote: message.transaction.is_vote,
transaction: Some((&message.transaction.transaction).into()),
meta: Some((&message.transaction.meta).into()),
index: message.transaction.index as u64,
}),
transaction: Some((&message.transaction).into()),
slot: message.slot,
}),
Message::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock {
@ -206,6 +249,14 @@ impl From<&Message> for UpdateOneof {
rewards: Some(message.rewards.as_slice().into()),
block_time: message.block_time.map(|v| v.into()),
block_height: message.block_height.map(|v| v.into()),
transactions: message.transactions.iter().map(Into::into).collect(),
}),
Message::BlockMeta(message) => UpdateOneof::BlockMeta(SubscribeUpdateBlockMeta {
slot: message.slot,
blockhash: message.blockhash.clone(),
rewards: Some(message.rewards.as_slice().into()),
block_time: message.block_time.map(|v| v.into()),
block_height: message.block_height.map(|v| v.into()),
}),
}
}
@ -359,6 +410,7 @@ impl Geyser for GrpcService {
slots: HashMap::new(),
transactions: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
},
self.config.filters.as_ref(),
)

View File

@ -1,7 +1,7 @@
use {
crate::{
config::Config,
grpc::{GrpcService, Message},
grpc::{GrpcService, Message, MessageTransaction, MessageTransactionInfo},
prom::{PrometheusService, SLOT_STATUS},
},
solana_geyser_plugin_interface::geyser_plugin_interface::{
@ -21,6 +21,7 @@ pub struct PluginInner {
grpc_channel: mpsc::UnboundedSender<Message>,
grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService,
transactions: Option<(u64, Vec<MessageTransactionInfo>)>,
}
#[derive(Debug, Default)]
@ -54,6 +55,7 @@ impl GeyserPlugin for Plugin {
grpc_channel,
grpc_shutdown_tx,
prometheus,
transactions: None,
});
Ok(())
@ -107,8 +109,26 @@ impl GeyserPlugin for Plugin {
transaction: ReplicaTransactionInfoVersions<'_>,
slot: u64,
) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
let message = Message::Transaction((transaction, slot).into());
let inner = self.inner.as_mut().expect("initialized");
let msg_tx: MessageTransaction = (transaction, slot).into();
match &mut inner.transactions {
Some((current_slot, transactions)) if *current_slot == slot => {
transactions.push(msg_tx.transaction.clone());
}
Some((current_slot, _)) => {
log::error!(
"got tx from block {}, while current block is {}",
slot,
current_slot
);
}
None => {
inner.transactions = Some((slot, vec![msg_tx.transaction.clone()]));
}
}
let message = Message::Transaction(msg_tx);
let _ = inner.grpc_channel.send(message);
Ok(())
@ -118,8 +138,29 @@ impl GeyserPlugin for Plugin {
&mut self,
blockinfo: ReplicaBlockInfoVersions<'_>,
) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
let message = Message::Block(blockinfo.into());
let inner = self.inner.as_mut().expect("initialized");
let ReplicaBlockInfoVersions::V0_0_1(block) = &blockinfo;
let transactions = match inner.transactions.take() {
Some((slot, transactions)) if slot == block.slot => transactions,
Some((slot, _)) => {
let msg = format!(
"invalid transactions for block {}, found {}",
block.slot, slot
);
log::error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
}
None => {
let msg = format!("no transactions for block {}", block.slot);
log::error!("{}", msg);
return Err(GeyserPluginError::Custom(msg.into()));
}
};
let message = Message::Block((&blockinfo, transactions).into());
let _ = inner.grpc_channel.send(message);
let message = Message::BlockMeta((&blockinfo).into());
let _ = inner.grpc_channel.send(message);
Ok(())