change accounts filter

This commit is contained in:
Kirill Fomichev 2022-10-19 19:19:59 -03:00
parent c7de7fae09
commit a0aa76b51d
No known key found for this signature in database
GPG Key ID: 6AA0144D5E0C0C0A
9 changed files with 403 additions and 114 deletions

2
Cargo.lock generated
View File

@ -2398,7 +2398,6 @@ name = "solana-geyser-grpc"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bs58",
"cargo-lock", "cargo-lock",
"clap", "clap",
"futures", "futures",
@ -2412,6 +2411,7 @@ dependencies = [
"serde_json", "serde_json",
"solana-geyser-plugin-interface", "solana-geyser-plugin-interface",
"solana-logger", "solana-logger",
"solana-sdk",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",

View File

@ -9,7 +9,6 @@ crate-type = ["cdylib", "rlib"]
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
bs58 = "0.4"
clap = { version = "3", features = ["cargo", "derive"] } clap = { version = "3", features = ["cargo", "derive"] }
futures = "0.3" futures = "0.3"
hyper = { version = "0.14", features = ["server"] } hyper = { version = "0.14", features = ["server"] }
@ -21,6 +20,7 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
solana-geyser-plugin-interface = "=1.10.40" solana-geyser-plugin-interface = "=1.10.40"
solana-logger = "=1.10.40" solana-logger = "=1.10.40"
solana-sdk = "=1.10.40"
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "time"] } tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "time"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tonic = { version = "0.8", features = ["gzip"] } tonic = { version = "0.8", features = ["gzip"] }

View File

@ -22,3 +22,20 @@ new message: Ok(SubscribeUpdate { update_oneof: Some(Account(SubscribeUpdateAcco
new message: Ok(SubscribeUpdate { update_oneof: Some(Slot(SubscribeUpdateSlot { slot: 3159, parent: None, status: Confirmed })) }) new message: Ok(SubscribeUpdate { update_oneof: Some(Slot(SubscribeUpdateSlot { slot: 3159, parent: None, status: Confirmed })) })
^C ^C
``` ```
### Filters
See [proto/geyser.proto](proto/geyser.proto).
#### Slots
- `enabled` — broadcast slots updates
#### Account
Accounts can be filtered by:
- `account` — acount Pubkey, match to any Pubkey from the array
- `owner` — account owner Pubkey, match to any Pubkey from the array
All fields in filter are optional, if all filters are empty then all accounts broadcasted. Fields works as logical `AND`. Values in the arrays works as logical `OR`.

View File

@ -1,5 +1,5 @@
{ {
"libpath": "../target/debug/libsolana_geyser_grpc.so", "libpath": "target/debug/libsolana_geyser_grpc.so",
"log": { "log": {
"level": "info" "level": "info"
}, },

View File

@ -7,9 +7,18 @@ service Geyser {
} }
message SubscribeRequest { message SubscribeRequest {
bool any = 1; SubscribeRequestSlots slots = 1;
repeated string accounts = 2; repeated SubscribeRequestAccounts accounts = 2;
repeated string owners = 3; }
message SubscribeRequestSlots {
bool enabled = 1;
}
message SubscribeRequestAccounts {
string filter = 1;
repeated string account = 2;
repeated string owner = 3;
} }
message SubscribeUpdate { message SubscribeUpdate {
@ -20,9 +29,10 @@ message SubscribeUpdate {
} }
message SubscribeUpdateAccount { message SubscribeUpdateAccount {
SubscribeUpdateAccountInfo account = 1; repeated string filters = 1;
uint64 slot = 2; SubscribeUpdateAccountInfo account = 2;
bool is_startup = 3; uint64 slot = 3;
bool is_startup = 4;
} }
message SubscribeUpdateAccountInfo { message SubscribeUpdateAccountInfo {
@ -33,6 +43,7 @@ message SubscribeUpdateAccountInfo {
uint64 rent_epoch = 5; uint64 rent_epoch = 5;
bytes data = 6; bytes data = 6;
uint64 write_version = 7; uint64 write_version = 7;
optional bytes txn_signature = 8;
} }
message SubscribeUpdateSlot { message SubscribeUpdateSlot {

View File

@ -1,7 +1,10 @@
use { use {
clap::Parser, clap::Parser,
futures::stream::StreamExt, futures::stream::StreamExt,
solana_geyser_grpc::grpc::proto::{geyser_client::GeyserClient, SubscribeRequest}, solana_geyser_grpc::grpc::proto::{
geyser_client::GeyserClient, SubscribeRequest, SubscribeRequestAccounts,
SubscribeRequestSlots,
},
tonic::Request, tonic::Request,
}; };
@ -13,14 +16,14 @@ struct Args {
endpoint: String, endpoint: String,
#[clap(short, long)] #[clap(short, long)]
/// Stream all accounts /// Subscribe on slots updates
any: bool, slots: bool,
#[clap(short, long, conflicts_with = "any")] #[clap(short, long)]
/// Filter by Account Pubkey /// Filter by Account Pubkey
accounts: Vec<String>, account: Vec<String>,
#[clap(short, long, conflicts_with = "any")] #[clap(short, long)]
/// Filter by Owner Pubkey /// Filter by Owner Pubkey
owner: Vec<String>, owner: Vec<String>,
} }
@ -31,16 +34,24 @@ async fn main() -> anyhow::Result<()> {
let mut client = GeyserClient::connect(args.endpoint).await?; let mut client = GeyserClient::connect(args.endpoint).await?;
let request = Request::new(SubscribeRequest { let request = Request::new(SubscribeRequest {
any: args.any, slots: Some(SubscribeRequestSlots {
accounts: args.accounts, enabled: args.slots,
owners: args.owner, }),
accounts: vec![SubscribeRequestAccounts {
filter: "client".to_owned(),
account: args.account,
owner: args.owner,
}],
}); });
let response = client.subscribe(request).await?; let response = client.subscribe(request).await?;
let mut stream = response.into_inner(); let mut stream = response.into_inner();
println!("stream opened"); println!("stream opened");
while let Some(message) = stream.next().await { while let Some(message) = stream.next().await {
println!("new message: {:?}", message); match message {
Ok(message) => println!("new message: {:?}", message),
Err(error) => eprintln!("error: {:?}", error),
}
} }
println!("stream closed"); println!("stream closed");

View File

@ -1,49 +1,210 @@
use std::collections::HashSet; use {
crate::grpc::proto::{SubscribeRequest, SubscribeRequestAccounts, SubscribeRequestSlots},
solana_sdk::pubkey::Pubkey,
std::{
collections::{HashMap, HashSet},
convert::{From, TryFrom},
hash::Hash,
str::FromStr,
},
};
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Default)]
pub struct AccountsFilter { struct FilterSlots {
any: bool, enabled: bool,
accounts: HashSet<Vec<u8>>,
owners: HashSet<Vec<u8>>,
} }
impl Default for AccountsFilter { impl From<&SubscribeRequestSlots> for FilterSlots {
fn default() -> Self { fn from(config: &SubscribeRequestSlots) -> Self {
Self { FilterSlots {
any: true, enabled: config.enabled,
accounts: HashSet::default(),
owners: HashSet::default(),
} }
} }
} }
impl AccountsFilter { #[derive(Debug)]
pub fn new<T1, T2>(any: bool, accounts: &[T1], owners: &[T2]) -> anyhow::Result<Self> struct FilterAccountsExistence {
where account: bool,
for<'a> T1: AsRef<[u8]> + std::cmp::PartialEq<&'a str> + std::fmt::Debug, owner: bool,
T2: AsRef<[u8]> + std::fmt::Debug, }
{
anyhow::ensure!(
!any || accounts.is_empty() && owners.is_empty(),
"`any` is not allow non-empty `accouts` and `owners`"
);
anyhow::ensure!(accounts.len() < 10_000, "Maximum 10k accounts are allowed");
anyhow::ensure!(owners.len() < 10_000, "Maximum 10k owners are allowed");
Ok(AccountsFilter { impl FilterAccountsExistence {
any, fn is_empty(&self) -> bool {
accounts: accounts !(self.account || self.owner)
.iter() }
.map(|key| bs58::decode(key).into_vec()) }
.collect::<Result<_, _>>()?,
owners: owners #[derive(Debug, Default)]
.iter() struct FilterAccounts {
.map(|key| bs58::decode(key).into_vec()) filters: HashMap<String, FilterAccountsExistence>,
.collect::<Result<_, _>>()?, account: HashMap<Pubkey, HashSet<String>>,
}) account_required: HashSet<String>,
} owner: HashMap<Pubkey, HashSet<String>>,
owner_required: HashSet<String>,
pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { }
self.any || self.accounts.contains(account) || self.owners.contains(owner)
impl TryFrom<&Vec<SubscribeRequestAccounts>> for FilterAccounts {
type Error = anyhow::Error;
fn try_from(configs: &Vec<SubscribeRequestAccounts>) -> Result<Self, Self::Error> {
let mut this = Self::default();
for config in configs {
let existence = FilterAccountsExistence {
account: Self::set(
&mut this.account,
&mut this.account_required,
&config.filter,
config
.account
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
),
owner: Self::set(
&mut this.owner,
&mut this.owner_required,
&config.filter,
config
.owner
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
),
};
anyhow::ensure!(
this.filters
.insert(config.filter.clone(), existence)
.is_none(),
"filter {} duplicated",
config.filter
);
}
Ok(this)
}
}
impl FilterAccounts {
fn set<Q, I>(
map: &mut HashMap<Q, HashSet<String>>,
map_required: &mut HashSet<String>,
name: &str,
keys: I,
) -> bool
where
Q: Hash + Eq + Clone,
I: Iterator<Item = Q>,
{
let mut required = false;
for key in keys {
if map.entry(key).or_default().insert(name.to_string()) {
required = true;
}
}
if required {
map_required.insert(name.to_string());
}
required
}
}
#[derive(Debug)]
pub struct Filter {
slots: FilterSlots,
accounts: FilterAccounts,
}
impl TryFrom<&SubscribeRequest> for Filter {
type Error = anyhow::Error;
fn try_from(config: &SubscribeRequest) -> Result<Self, Self::Error> {
Ok(Self {
slots: config
.slots
.as_ref()
.map(FilterSlots::from)
.unwrap_or_default(),
accounts: FilterAccounts::try_from(&config.accounts)?,
})
}
}
impl Filter {
pub fn is_slots_enabled(&self) -> bool {
self.slots.enabled
}
pub fn create_accounts_match(&self) -> FilterAccountsMatch {
FilterAccountsMatch::new(&self.accounts)
}
}
#[derive(Debug)]
pub struct FilterAccountsMatch<'a> {
filter: &'a FilterAccounts,
account: HashSet<String>,
owner: HashSet<String>,
}
impl<'a> FilterAccountsMatch<'a> {
fn new(filter: &'a FilterAccounts) -> Self {
Self {
filter,
account: Default::default(),
owner: Default::default(),
}
}
fn extend<Q: Hash + Eq>(
set: &mut HashSet<String>,
map: &HashMap<Q, HashSet<String>>,
key: &Q,
) -> bool {
if let Some(names) = map.get(key) {
for name in names {
if !set.contains(name) {
set.insert(name.clone());
}
}
true
} else {
false
}
}
pub fn match_account(&mut self, pubkey: &Pubkey) -> bool {
Self::extend(&mut self.account, &self.filter.account, pubkey)
}
pub fn match_owner(&mut self, pubkey: &Pubkey) -> bool {
Self::extend(&mut self.owner, &self.filter.owner, pubkey)
}
pub fn get_filters(&self) -> Vec<String> {
self.filter
.filters
.iter()
.filter_map(|(name, existence)| {
if existence.is_empty() {
return Some(name.clone());
}
let name = name.as_str();
let af = &self.filter;
// If filter name in required but not in matched => return `false`
if af.account_required.contains(name) && !self.account.contains(name) {
return None;
}
if af.owner_required.contains(name) && !self.owner.contains(name) {
return None;
}
Some(name.to_string())
})
.collect()
} }
} }

View File

@ -1,15 +1,20 @@
use { use {
crate::{ crate::{
config::ConfigGrpc, config::ConfigGrpc,
filters::AccountsFilter, filters::Filter,
grpc::proto::{ grpc::proto::{
geyser_server::{Geyser, GeyserServer}, geyser_server::{Geyser, GeyserServer},
subscribe_update::UpdateOneof, subscribe_update::UpdateOneof,
SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo, SubscribeRequest, SubscribeUpdate, SubscribeUpdateAccount, SubscribeUpdateAccountInfo,
SubscribeUpdateSlot, SubscribeUpdateSlotStatus,
}, },
prom::CONNECTIONS_TOTAL, prom::CONNECTIONS_TOTAL,
}, },
log::*, log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaAccountInfoVersions, SlotStatus,
},
solana_sdk::{pubkey::Pubkey, signature::Signature},
std::{ std::{
collections::HashMap, collections::HashMap,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
@ -28,10 +33,111 @@ pub mod proto {
tonic::include_proto!("geyser"); tonic::include_proto!("geyser");
} }
#[derive(Debug)]
pub struct UpdateAccountMessageAccount {
pub pubkey: Pubkey,
pub lamports: u64,
pub owner: Pubkey,
pub executable: bool,
pub rent_epoch: u64,
pub data: Vec<u8>,
pub write_version: u64,
pub txn_signature: Option<Signature>,
}
#[derive(Debug)]
pub struct UpdateAccountMessage {
pub account: UpdateAccountMessageAccount,
pub slot: u64,
pub is_startup: bool,
}
impl<'a> From<(ReplicaAccountInfoVersions<'a>, u64, bool)> for UpdateAccountMessage {
fn from((account, slot, is_startup): (ReplicaAccountInfoVersions<'a>, u64, bool)) -> Self {
Self {
account: match account {
ReplicaAccountInfoVersions::V0_0_1(info) => UpdateAccountMessageAccount {
pubkey: Pubkey::new(info.pubkey),
lamports: info.lamports,
owner: Pubkey::new(info.owner),
executable: info.executable,
rent_epoch: info.rent_epoch,
data: info.data.into(),
write_version: info.write_version,
txn_signature: None,
},
},
slot,
is_startup,
}
}
}
impl UpdateAccountMessage {
fn with_filters(&self, filters: Vec<String>) -> SubscribeUpdate {
SubscribeUpdate {
update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount {
account: Some(SubscribeUpdateAccountInfo {
pubkey: self.account.pubkey.as_ref().into(),
lamports: self.account.lamports,
owner: self.account.owner.as_ref().into(),
executable: self.account.executable,
rent_epoch: self.account.rent_epoch,
data: self.account.data.clone(),
write_version: self.account.write_version,
txn_signature: self.account.txn_signature.map(|sig| sig.as_ref().into()),
}),
slot: self.slot,
is_startup: self.is_startup,
filters,
})),
}
}
}
#[derive(Debug)]
pub struct UpdateSlotMessage {
slot: u64,
parent: Option<u64>,
status: SubscribeUpdateSlotStatus,
}
impl From<(u64, Option<u64>, SlotStatus)> for UpdateSlotMessage {
fn from((slot, parent, status): (u64, Option<u64>, SlotStatus)) -> Self {
Self {
slot,
parent,
status: match status {
SlotStatus::Processed => SubscribeUpdateSlotStatus::Processed,
SlotStatus::Confirmed => SubscribeUpdateSlotStatus::Confirmed,
SlotStatus::Rooted => SubscribeUpdateSlotStatus::Rooted,
},
}
}
}
impl From<&UpdateSlotMessage> for SubscribeUpdate {
fn from(msg: &UpdateSlotMessage) -> Self {
Self {
update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot {
slot: msg.slot,
parent: msg.parent,
status: msg.status as i32,
})),
}
}
}
#[derive(Debug)]
pub enum Message {
UpdateAccount(UpdateAccountMessage),
UpdateSlot(UpdateSlotMessage),
}
#[derive(Debug)] #[derive(Debug)]
struct ClientConnection { struct ClientConnection {
id: usize, id: usize,
accounts_filter: AccountsFilter, filter: Filter,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>, stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
} }
@ -46,7 +152,7 @@ impl GrpcService {
pub fn create( pub fn create(
config: ConfigGrpc, config: ConfigGrpc,
) -> Result< ) -> Result<
(mpsc::UnboundedSender<SubscribeUpdate>, oneshot::Sender<()>), (mpsc::UnboundedSender<Message>, oneshot::Sender<()>),
Box<dyn std::error::Error + Send + Sync>, Box<dyn std::error::Error + Send + Sync>,
> { > {
// Bind service address // Bind service address
@ -86,7 +192,7 @@ impl GrpcService {
} }
async fn send_loop( async fn send_loop(
mut update_channel_rx: mpsc::UnboundedReceiver<SubscribeUpdate>, mut update_channel_rx: mpsc::UnboundedReceiver<Message>,
mut new_clients_rx: mpsc::UnboundedReceiver<ClientConnection>, mut new_clients_rx: mpsc::UnboundedReceiver<ClientConnection>,
) { ) {
let mut clients: HashMap<usize, ClientConnection> = HashMap::new(); let mut clients: HashMap<usize, ClientConnection> = HashMap::new();
@ -97,20 +203,32 @@ impl GrpcService {
let mut ids_closed = vec![]; let mut ids_closed = vec![];
for client in clients.values() { for client in clients.values() {
let message = match &message.update_oneof { if let Some(message) = match &message {
Some(UpdateOneof::Account(SubscribeUpdateAccount { Message::UpdateAccount(message) => {
account: Some(SubscribeUpdateAccountInfo { pubkey, owner,.. }), let mut filter = client.filter.create_accounts_match();
.. filter.match_account(&message.account.pubkey);
})) if !client.accounts_filter.is_account_selected(pubkey, owner) => { filter.match_owner(&message.account.owner);
continue;
}
_ => message.clone(),
};
match client.stream_tx.try_send(Ok(message)) { let filters = filter.get_filters();
Ok(()) => {}, if !filters.is_empty() {
Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(client.id), Some(message.with_filters(filters))
Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(client.id), } else {
None
}
}
Message::UpdateSlot(message) => {
if client.filter.is_slots_enabled() {
Some(message.into())
} else {
None
}
}
} {
match client.stream_tx.try_send(Ok(message)) {
Ok(()) => {},
Err(mpsc::error::TrySendError::Full(_)) => ids_full.push(client.id),
Err(mpsc::error::TrySendError::Closed(_)) => ids_closed.push(client.id),
}
} }
} }
@ -152,8 +270,7 @@ impl Geyser for GrpcService {
let id = self.subscribe_id.fetch_add(1, Ordering::SeqCst); let id = self.subscribe_id.fetch_add(1, Ordering::SeqCst);
info!("{}, new subscriber", id); info!("{}, new subscriber", id);
let data = request.get_ref(); let filter = match Filter::try_from(request.get_ref()) {
let accounts_filter = match AccountsFilter::new(data.any, &data.accounts, &data.owners) {
Ok(filter) => filter, Ok(filter) => filter,
Err(error) => { Err(error) => {
let message = format!("failed to create filter: {:?}", error); let message = format!("failed to create filter: {:?}", error);
@ -165,7 +282,7 @@ impl Geyser for GrpcService {
let (stream_tx, stream_rx) = mpsc::channel(self.config.channel_capacity); let (stream_tx, stream_rx) = mpsc::channel(self.config.channel_capacity);
if let Err(_error) = self.new_clients_tx.send(ClientConnection { if let Err(_error) = self.new_clients_tx.send(ClientConnection {
id, id,
accounts_filter, filter,
stream_tx, stream_tx,
}) { }) {
return Err(Status::internal("")); return Err(Status::internal(""));

View File

@ -1,13 +1,7 @@
use { use {
crate::{ crate::{
config::Config, config::Config,
grpc::{ grpc::{GrpcService, Message},
proto::{
subscribe_update::UpdateOneof, SubscribeUpdate, SubscribeUpdateAccount,
SubscribeUpdateAccountInfo, SubscribeUpdateSlot, SubscribeUpdateSlotStatus,
},
GrpcService,
},
prom::{PrometheusService, SLOT_STATUS}, prom::{PrometheusService, SLOT_STATUS},
}, },
solana_geyser_plugin_interface::geyser_plugin_interface::{ solana_geyser_plugin_interface::geyser_plugin_interface::{
@ -24,7 +18,7 @@ use {
#[derive(Debug)] #[derive(Debug)]
pub struct PluginInner { pub struct PluginInner {
runtime: Runtime, runtime: Runtime,
grpc_channel: mpsc::UnboundedSender<SubscribeUpdate>, grpc_channel: mpsc::UnboundedSender<Message>,
grpc_shutdown_tx: oneshot::Sender<()>, grpc_shutdown_tx: oneshot::Sender<()>,
prometheus: PrometheusService, prometheus: PrometheusService,
} }
@ -81,23 +75,9 @@ impl GeyserPlugin for Plugin {
is_startup: bool, is_startup: bool,
) -> PluginResult<()> { ) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized"); let inner = self.inner.as_ref().expect("initialized");
let _ = inner.grpc_channel.send(match account { let _ = inner
ReplicaAccountInfoVersions::V0_0_1(info) => SubscribeUpdate { .grpc_channel
update_oneof: Some(UpdateOneof::Account(SubscribeUpdateAccount { .send(Message::UpdateAccount((account, slot, is_startup).into()));
account: Some(SubscribeUpdateAccountInfo {
pubkey: info.pubkey.into(),
lamports: info.lamports,
owner: info.owner.into(),
executable: info.executable,
rent_epoch: info.rent_epoch,
data: info.data.into(),
write_version: info.write_version,
}),
slot,
is_startup,
})),
},
});
Ok(()) Ok(())
} }
@ -109,17 +89,9 @@ impl GeyserPlugin for Plugin {
status: SlotStatus, status: SlotStatus,
) -> PluginResult<()> { ) -> PluginResult<()> {
let inner = self.inner.as_ref().expect("initialized"); let inner = self.inner.as_ref().expect("initialized");
let _ = inner.grpc_channel.send(SubscribeUpdate { let _ = inner
update_oneof: Some(UpdateOneof::Slot(SubscribeUpdateSlot { .grpc_channel
slot, .send(Message::UpdateSlot((slot, parent, status).into()));
parent,
status: match status {
SlotStatus::Processed => SubscribeUpdateSlotStatus::Processed,
SlotStatus::Confirmed => SubscribeUpdateSlotStatus::Confirmed,
SlotStatus::Rooted => SubscribeUpdateSlotStatus::Rooted,
} as i32,
})),
});
SLOT_STATUS SLOT_STATUS
.with_label_values(&[match status { .with_label_values(&[match status {