support blocknotify

This commit is contained in:
Kirill Fomichev 2022-10-20 22:06:23 -03:00
parent 8301bbe292
commit 7be020a5a8
No known key found for this signature in database
GPG Key ID: 6AA0144D5E0C0C0A
7 changed files with 150 additions and 14 deletions

View File

@ -44,3 +44,7 @@ All fields in filter are optional but at least 1 is required. Fields works as lo
- `any` — stream all transactions - `any` — stream all transactions
- `vote` — enable/disable broadcast `vote` transactions - `vote` — enable/disable broadcast `vote` transactions
- `failed` — enable/disable broadcast `failed` transactions - `failed` — enable/disable broadcast `failed` transactions
#### Blocks
- `any` — stream all blocks

View File

@ -12,6 +12,7 @@ message SubscribeRequest {
map<string, SubscribeRequestFilterAccounts> accounts = 1; map<string, SubscribeRequestFilterAccounts> accounts = 1;
map<string, SubscribeRequestFilterSlots> slots = 2; map<string, SubscribeRequestFilterSlots> slots = 2;
map<string, SubscribeRequestFilterTransactions> transactions = 3; map<string, SubscribeRequestFilterTransactions> transactions = 3;
map<string, SubscribeRequestFilterBlocks> blocks = 4;
} }
message SubscribeRequestFilterAccounts { message SubscribeRequestFilterAccounts {
@ -30,12 +31,17 @@ message SubscribeRequestFilterTransactions {
bool failed = 3; bool failed = 3;
} }
message SubscribeRequestFilterBlocks {
bool any = 1;
}
message SubscribeUpdate { message SubscribeUpdate {
repeated string filters = 1; repeated string filters = 1;
oneof update_oneof { oneof update_oneof {
SubscribeUpdateAccount account = 2; SubscribeUpdateAccount account = 2;
SubscribeUpdateSlot slot = 3; SubscribeUpdateSlot slot = 3;
SubscribeUpdateTransaction transaction = 4; SubscribeUpdateTransaction transaction = 4;
SubscribeUpdateBlock block = 5;
} }
} }
@ -80,3 +86,11 @@ message SubscribeUpdateTransactionInfo {
solana.storage.ConfirmedBlock.TransactionStatusMeta meta = 4; solana.storage.ConfirmedBlock.TransactionStatusMeta meta = 4;
// uint64 index = 5; // uint64 index = 5;
} }
message SubscribeUpdateBlock {
uint64 slot = 1;
string blockhash = 2;
solana.storage.ConfirmedBlock.Rewards rewards = 3;
solana.storage.ConfirmedBlock.UnixTimestamp block_time = 4;
solana.storage.ConfirmedBlock.BlockHeight block_height = 5;
}

View File

@ -3,7 +3,8 @@ use {
futures::stream::StreamExt, futures::stream::StreamExt,
solana_geyser_grpc::proto::{ solana_geyser_grpc::proto::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts, geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestFilterAccounts,
SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
}, },
std::collections::HashMap, std::collections::HashMap,
tonic::Request, tonic::Request,
@ -16,10 +17,6 @@ struct Args {
/// Service endpoint /// Service endpoint
endpoint: String, endpoint: String,
#[clap(long)]
/// Subscribe on slots updates
slots: bool,
#[clap(long)] #[clap(long)]
/// Subscribe on accounts updates /// Subscribe on accounts updates
accounts: bool, accounts: bool,
@ -32,6 +29,10 @@ struct Args {
/// Filter by Owner Pubkey /// Filter by Owner Pubkey
owner: Vec<String>, owner: Vec<String>,
#[clap(long)]
/// Subscribe on slots updates
slots: bool,
#[clap(long)] #[clap(long)]
/// Subscribe on transactions updates /// Subscribe on transactions updates
transactions: bool, transactions: bool,
@ -43,6 +44,10 @@ struct Args {
#[clap(short, long)] #[clap(short, long)]
/// Filter failed transactions /// Filter failed transactions
failed: bool, failed: bool,
#[clap(long)]
/// Subscribe on block updates
blocks: bool,
} }
#[tokio::main] #[tokio::main]
@ -81,11 +86,20 @@ async fn main() -> anyhow::Result<()> {
); );
} }
let mut blocks = HashMap::new();
if args.blocks {
blocks.insert(
"client".to_owned(),
SubscribeRequestFilterBlocks { any: true },
);
}
let mut client = GeyserClient::connect(args.endpoint).await?; let mut client = GeyserClient::connect(args.endpoint).await?;
let request = Request::new(SubscribeRequest { let request = Request::new(SubscribeRequest {
slots, slots,
accounts, accounts,
transactions, transactions,
blocks,
}); });
let response = client.subscribe(request).await?; let response = client.subscribe(request).await?;
let mut stream = response.into_inner(); let mut stream = response.into_inner();

View File

@ -1,9 +1,9 @@
use { use {
crate::{ crate::{
grpc::{Message, MessageAccount, MessageSlot, MessageTransaction}, grpc::{Message, MessageAccount, MessageBlock, MessageSlot, MessageTransaction},
proto::{ proto::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
SubscribeRequestFilterTransactions, SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions,
}, },
}, },
solana_sdk::pubkey::Pubkey, solana_sdk::pubkey::Pubkey,
@ -20,6 +20,7 @@ pub struct Filter {
accounts: FilterAccounts, accounts: FilterAccounts,
slots: FilterSlots, slots: FilterSlots,
transactions: FilterTransactions, transactions: FilterTransactions,
blocks: FilterBlocks,
} }
impl TryFrom<&SubscribeRequest> for Filter { impl TryFrom<&SubscribeRequest> for Filter {
@ -30,6 +31,7 @@ impl TryFrom<&SubscribeRequest> for Filter {
accounts: FilterAccounts::try_from(&config.accounts)?, accounts: FilterAccounts::try_from(&config.accounts)?,
slots: FilterSlots::try_from(&config.slots)?, slots: FilterSlots::try_from(&config.slots)?,
transactions: FilterTransactions::try_from(&config.transactions)?, transactions: FilterTransactions::try_from(&config.transactions)?,
blocks: FilterBlocks::try_from(&config.blocks)?,
}) })
} }
} }
@ -40,6 +42,7 @@ impl Filter {
Message::Account(message) => self.accounts.get_filters(message), Message::Account(message) => self.accounts.get_filters(message),
Message::Slot(message) => self.slots.get_filters(message), Message::Slot(message) => self.slots.get_filters(message),
Message::Transaction(message) => self.transactions.get_filters(message), Message::Transaction(message) => self.transactions.get_filters(message),
Message::Block(message) => self.blocks.get_filters(message),
} }
} }
} }
@ -310,3 +313,37 @@ impl FilterTransactions {
.collect() .collect()
} }
} }
#[derive(Debug, Default)]
struct FilterBlocks {
filters: Vec<String>,
}
impl TryFrom<&HashMap<String, SubscribeRequestFilterBlocks>> for FilterBlocks {
type Error = anyhow::Error;
fn try_from(
configs: &HashMap<String, SubscribeRequestFilterBlocks>,
) -> Result<Self, Self::Error> {
Ok(FilterBlocks {
filters: configs
.iter()
.filter_map(
|(name, filter)| {
if filter.any {
Some(name.clone())
} else {
None
}
},
)
.collect(),
})
}
}
impl FilterBlocks {
fn get_filters(&self, _message: &MessageBlock) -> Vec<String> {
self.filters.clone()
}
}

View File

@ -7,16 +7,20 @@ use {
geyser_server::{Geyser, GeyserServer}, geyser_server::{Geyser, GeyserServer},
subscribe_update::UpdateOneof, subscribe_update::UpdateOneof,
SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
SubscribeUpdateSlot, SubscribeUpdateSlotStatus, SubscribeUpdateTransaction, SubscribeUpdateBlock, SubscribeUpdateSlot, SubscribeUpdateSlotStatus,
SubscribeUpdateTransactionInfo, SubscribeUpdateTransaction, SubscribeUpdateTransactionInfo,
}, },
}, },
log::*, log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{ solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoVersions, ReplicaTransactionInfoVersions, SlotStatus, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions,
SlotStatus,
}, },
solana_sdk::{pubkey::Pubkey, signature::Signature, transaction::VersionedTransaction}, solana_sdk::{
solana_transaction_status::TransactionStatusMeta, clock::UnixTimestamp, pubkey::Pubkey, signature::Signature,
transaction::VersionedTransaction,
},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{ std::{
collections::HashMap, collections::HashMap,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
@ -124,11 +128,35 @@ impl<'a> From<(ReplicaTransactionInfoVersions<'a>, u64)> for MessageTransaction
} }
} }
#[derive(Debug)]
pub struct MessageBlock {
pub slot: u64,
pub blockhash: String,
pub rewards: Vec<Reward>,
pub block_time: Option<UnixTimestamp>,
pub block_height: Option<u64>,
}
impl<'a> From<ReplicaBlockInfoVersions<'a>> for MessageBlock {
fn from(blockinfo: ReplicaBlockInfoVersions<'a>) -> Self {
match blockinfo {
ReplicaBlockInfoVersions::V0_0_1(info) => Self {
slot: info.slot,
blockhash: info.blockhash.to_string(),
rewards: info.rewards.into(),
block_time: info.block_time,
block_height: info.block_height,
},
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum Message { pub enum Message {
Slot(MessageSlot), Slot(MessageSlot),
Account(MessageAccount), Account(MessageAccount),
Transaction(MessageTransaction), Transaction(MessageTransaction),
Block(MessageBlock),
} }
impl From<&Message> for UpdateOneof { impl From<&Message> for UpdateOneof {
@ -162,6 +190,13 @@ impl From<&Message> for UpdateOneof {
}), }),
slot: message.slot, slot: message.slot,
}), }),
Message::Block(message) => UpdateOneof::Block(SubscribeUpdateBlock {
slot: message.slot,
blockhash: message.blockhash.clone(),
rewards: Some(message.rewards.as_slice().into()),
block_time: message.block_time.map(|v| v.into()),
block_height: message.block_height.map(|v| v.into()),
}),
} }
} }
} }

View File

@ -5,7 +5,7 @@ use {
prom::{PrometheusService, SLOT_STATUS}, prom::{PrometheusService, SLOT_STATUS},
}, },
solana_geyser_plugin_interface::geyser_plugin_interface::{ solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
}, },
std::time::Duration, std::time::Duration,
@ -114,6 +114,17 @@ impl GeyserPlugin for Plugin {
Ok(()) Ok(())
} }
fn notify_block_metadata(
&mut self,
blockinfo: ReplicaBlockInfoVersions<'_>,
) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized");
let message = Message::Block(blockinfo.into());
let _ = inner.grpc_channel.send(message);
Ok(())
}
fn transaction_notifications_enabled(&self) -> bool { fn transaction_notifications_enabled(&self) -> bool {
true true
} }

View File

@ -16,6 +16,7 @@ pub use solana::storage::confirmed_block::*;
mod convert { mod convert {
use { use {
solana_sdk::{ solana_sdk::{
clock::UnixTimestamp,
instruction::CompiledInstruction, instruction::CompiledInstruction,
message::{ message::{
legacy::Message as LegacyMessage, v0::MessageAddressTableLookup, MessageHeader, legacy::Message as LegacyMessage, v0::MessageAddressTableLookup, MessageHeader,
@ -225,4 +226,24 @@ mod convert {
} }
} }
} }
impl From<&[Reward]> for super::Rewards {
fn from(rewards: &[Reward]) -> Self {
Self {
rewards: rewards.iter().map(|v| v.into()).collect(),
}
}
}
impl From<u64> for super::BlockHeight {
fn from(block_height: u64) -> Self {
Self { block_height }
}
}
impl From<UnixTimestamp> for super::UnixTimestamp {
fn from(timestamp: UnixTimestamp) -> Self {
Self { timestamp }
}
}
} }