Add filter limits to config (#4)

This commit is contained in:
Kirill Fomichev 2022-10-25 14:52:29 -03:00 committed by GitHub
parent fc8f8afb05
commit 1b6e8b008d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 299 additions and 96 deletions

View File

@ -42,11 +42,43 @@ If all fields are empty then all accounts are broadcasted. Otherwise fields work
- `vote` — enable/disable broadcast `vote` transactions
- `failed` — enable/disable broadcast `failed` transactions
- `accounts_include` — filter transactions which use any account
- `accounts_exclude` — filter transactions which do not use any account
- `account_include` — filter transactions which use any account
- `account_exclude` — filter transactions which do not use any account
If all fields are empty then all transactions are broadcasted. Otherwise fields works as logical `AND` and values in arrays as logical `OR`.
#### Blocks
Currently all blocks are broadcasted.
### Limit filters
It's possible to add limits for filters in config. If `filters` field is omitted then filters doesn't have any limits.
```json
"grpc": {
"filters": {
"accounts": {
"max": 1,
"any": false,
"account_max": 10,
"account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"owner_max": 10,
"owner_reject": ["11111111111111111111111111111111"]
},
"slots": {
"max": 1
},
"transactions": {
"max": 1,
"any": false,
"account_include_max": 10,
"account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_exclude_max": 10
},
"blocks": {
"max": 1
}
}
}
```

View File

@ -5,7 +5,30 @@
},
"grpc": {
"address": "0.0.0.0:10000",
"channel_capacity": "100_000"
"channel_capacity": "100_000",
"filters": {
"accounts": {
"max": 1,
"any": false,
"account_max": 10,
"account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"owner_max": 10,
"owner_reject": ["11111111111111111111111111111111"]
},
"slots": {
"max": 1
},
"transactions": {
"max": 1,
"any": false,
"account_include_max": 10,
"account_include_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
"account_exclude_max": 10
},
"blocks": {
"max": 1
}
}
},
"prometheus": {
"address": "0.0.0.0:8999"

View File

@ -25,8 +25,8 @@ message SubscribeRequestFilterSlots {}
message SubscribeRequestFilterTransactions {
optional bool vote = 1;
optional bool failed = 2;
repeated string accounts_include = 3;
repeated string accounts_exclude = 4;
repeated string account_include = 3;
repeated string account_exclude = 4;
}
message SubscribeRequestFilterBlocks {}

View File

@ -77,8 +77,8 @@ async fn main() -> anyhow::Result<()> {
SubscribeRequestFilterTransactions {
vote: args.vote,
failed: args.failed,
accounts_include: vec![],
accounts_exclude: vec![],
account_include: vec![],
account_exclude: vec![],
},
);
}

View File

@ -3,7 +3,8 @@ use {
solana_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPluginError, Result as PluginResult,
},
std::{fs::read_to_string, net::SocketAddr, path::Path},
solana_sdk::pubkey::Pubkey,
std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path},
};
#[derive(Debug, Clone, Deserialize)]
@ -53,19 +54,114 @@ impl ConfigLog {
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpc {
/// Address of Grpc service.
pub address: SocketAddr,
/// Capacity of the channel per connection
#[serde(deserialize_with = "deserialize_channel_capacity")]
#[serde(
default = "ConfigGrpc::channel_capacity_default",
deserialize_with = "UsizeStr::deserialize_usize"
)]
pub channel_capacity: usize,
/// Limits for possible filters
#[serde(default)]
pub filters: Option<ConfigGrpcFilters>,
}
fn deserialize_channel_capacity<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
Ok(UsizeStr::deserialize(deserializer)?.value)
impl ConfigGrpc {
const fn channel_capacity_default() -> usize {
250_000
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFilters {
pub accounts: ConfigGrpcFiltersAccounts,
pub slots: ConfigGrpcFiltersSlots,
pub transactions: ConfigGrpcFiltersTransactions,
pub blocks: ConfigGrpcFiltersBlocks,
}
impl ConfigGrpcFilters {
pub fn check_max(len: usize, max: usize) -> anyhow::Result<()> {
anyhow::ensure!(
len <= max,
"Max amount of filters reached, only {} allowed",
max
);
Ok(())
}
pub fn check_any(is_empty: bool, any: bool) -> anyhow::Result<()> {
anyhow::ensure!(
!is_empty || any,
"Broadcast `any` is not allowed, at least one filter required"
);
Ok(())
}
pub fn check_pubkey_max(len: usize, max: usize) -> anyhow::Result<()> {
anyhow::ensure!(
len <= max,
"Max amount of Pubkeys reached, only {} allowed",
max
);
Ok(())
}
pub fn check_pubkey_reject(pubkey: &Pubkey, set: &HashSet<Pubkey>) -> anyhow::Result<()> {
anyhow::ensure!(
!set.contains(pubkey),
"Pubkey {} in filters not allowed",
pubkey
);
Ok(())
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersAccounts {
pub max: usize,
pub any: bool,
pub account_max: usize,
#[serde(deserialize_with = "deserialize_pubkey_set")]
pub account_reject: HashSet<Pubkey>,
pub owner_max: usize,
#[serde(deserialize_with = "deserialize_pubkey_set")]
pub owner_reject: HashSet<Pubkey>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersSlots {
pub max: usize,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersTransactions {
pub max: usize,
pub any: bool,
pub account_include_max: usize,
#[serde(deserialize_with = "deserialize_pubkey_set")]
pub account_include_reject: HashSet<Pubkey>,
pub account_exclude_max: usize,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersBlocks {
pub max: usize,
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigPrometheus {
/// Address of Prometheus service.
pub address: SocketAddr,
}
#[derive(Debug, Default, PartialEq, Eq, Hash)]
@ -96,9 +192,25 @@ impl<'de> Deserialize<'de> for UsizeStr {
}
}
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigPrometheus {
/// Address of Prometheus service.
pub address: SocketAddr,
impl UsizeStr {
fn deserialize_usize<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self::deserialize(deserializer)?.value)
}
}
fn deserialize_pubkey_set<'de, D>(deserializer: D) -> Result<HashSet<Pubkey>, D::Error>
where
D: Deserializer<'de>,
{
Vec::<&str>::deserialize(deserializer)?
.into_iter()
.map(|value| {
value.parse().map_err(|error| {
de::Error::custom(format!("Invalid pubkey: {} ({:?})", value, error))
})
})
.collect::<Result<_, _>>()
}

View File

@ -1,5 +1,9 @@
use {
crate::{
config::{
ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks,
ConfigGrpcFiltersSlots, ConfigGrpcFiltersTransactions,
},
grpc::{Message, MessageAccount, MessageBlock, MessageSlot, MessageTransaction},
proto::{
SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterBlocks,
@ -9,8 +13,8 @@ use {
solana_sdk::pubkey::Pubkey,
std::{
collections::{HashMap, HashSet},
convert::TryFrom,
hash::Hash,
iter::FromIterator,
str::FromStr,
},
};
@ -23,20 +27,40 @@ pub struct Filter {
blocks: FilterBlocks,
}
impl TryFrom<&SubscribeRequest> for Filter {
type Error = anyhow::Error;
fn try_from(config: &SubscribeRequest) -> Result<Self, Self::Error> {
impl Filter {
pub fn new(
config: &SubscribeRequest,
limit: Option<&ConfigGrpcFilters>,
) -> anyhow::Result<Self> {
Ok(Self {
accounts: FilterAccounts::try_from(&config.accounts)?,
slots: FilterSlots::try_from(&config.slots)?,
transactions: FilterTransactions::try_from(&config.transactions)?,
blocks: FilterBlocks::try_from(&config.blocks)?,
accounts: FilterAccounts::new(&config.accounts, limit.map(|v| &v.accounts))?,
slots: FilterSlots::new(&config.slots, limit.map(|v| &v.slots))?,
transactions: FilterTransactions::new(
&config.transactions,
limit.map(|v| &v.transactions),
)?,
blocks: FilterBlocks::new(&config.blocks, limit.map(|v| &v.blocks))?,
})
}
}
impl Filter {
fn decode_pubkeys<T: FromIterator<Pubkey>>(
pubkeys: &[String],
limit: Option<&HashSet<Pubkey>>,
) -> anyhow::Result<T> {
pubkeys
.iter()
.map(|value| match Pubkey::from_str(value) {
Ok(pubkey) => {
if let Some(limit) = limit {
ConfigGrpcFilters::check_pubkey_reject(&pubkey, limit)?;
}
Ok(pubkey)
}
Err(error) => Err(error.into()),
})
.collect::<_>()
}
pub fn get_filters(&self, message: &Message) -> Vec<String> {
match message {
Message::Account(message) => self.accounts.get_filters(message),
@ -56,57 +80,53 @@ struct FilterAccounts {
owner_required: HashSet<String>,
}
impl TryFrom<&HashMap<String, SubscribeRequestFilterAccounts>> for FilterAccounts {
type Error = anyhow::Error;
fn try_from(
impl FilterAccounts {
fn new(
configs: &HashMap<String, SubscribeRequestFilterAccounts>,
) -> Result<Self, Self::Error> {
limit: Option<&ConfigGrpcFiltersAccounts>,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
let mut this = Self::default();
for (name, filter) in configs {
if let Some(limit) = limit {
ConfigGrpcFilters::check_any(
filter.account.is_empty() && filter.owner.is_empty(),
limit.any,
)?;
ConfigGrpcFilters::check_pubkey_max(filter.account.len(), limit.account_max)?;
ConfigGrpcFilters::check_pubkey_max(filter.owner.len(), limit.owner_max)?;
}
Self::set(
&mut this.account,
&mut this.account_required,
name,
filter
.account
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
Filter::decode_pubkeys(&filter.account, limit.map(|v| &v.account_reject))?,
);
Self::set(
&mut this.owner,
&mut this.owner_required,
name,
filter
.owner
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<Vec<_>, _>>()?
.into_iter(),
Filter::decode_pubkeys(&filter.owner, limit.map(|v| &v.owner_reject))?,
);
this.filters.push(name.clone());
}
Ok(this)
}
}
impl FilterAccounts {
fn set<Q, I>(
map: &mut HashMap<Q, HashSet<String>>,
fn set(
map: &mut HashMap<Pubkey, HashSet<String>>,
map_required: &mut HashSet<String>,
name: &str,
keys: I,
) -> bool
where
Q: Hash + Eq + Clone,
I: Iterator<Item = Q>,
{
keys: Vec<Pubkey>,
) -> bool {
let mut required = false;
for key in keys {
for key in keys.into_iter() {
if map.entry(key).or_default().insert(name.to_string()) {
required = true;
}
@ -194,12 +214,15 @@ struct FilterSlots {
filters: Vec<String>,
}
impl TryFrom<&HashMap<String, SubscribeRequestFilterSlots>> for FilterSlots {
type Error = anyhow::Error;
fn try_from(
impl FilterSlots {
fn new(
configs: &HashMap<String, SubscribeRequestFilterSlots>,
) -> Result<Self, Self::Error> {
limit: Option<&ConfigGrpcFiltersSlots>,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
Ok(FilterSlots {
filters: configs
.iter()
@ -208,9 +231,7 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterSlots>> for FilterSlots {
.collect(),
})
}
}
impl FilterSlots {
fn get_filters(&self, _message: &MessageSlot) -> Vec<String> {
self.filters.clone()
}
@ -220,8 +241,8 @@ impl FilterSlots {
pub struct FilterTransactionsInner {
vote: Option<bool>,
failed: Option<bool>,
accounts_include: HashSet<Pubkey>,
accounts_exclude: HashSet<Pubkey>,
account_include: HashSet<Pubkey>,
account_exclude: HashSet<Pubkey>,
}
#[derive(Debug, Default)]
@ -229,37 +250,51 @@ pub struct FilterTransactions {
filters: HashMap<String, FilterTransactionsInner>,
}
impl TryFrom<&HashMap<String, SubscribeRequestFilterTransactions>> for FilterTransactions {
type Error = anyhow::Error;
fn try_from(
impl FilterTransactions {
fn new(
configs: &HashMap<String, SubscribeRequestFilterTransactions>,
) -> Result<Self, Self::Error> {
limit: Option<&ConfigGrpcFiltersTransactions>,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
let mut this = Self::default();
for (name, filter) in configs {
if let Some(limit) = limit {
ConfigGrpcFilters::check_any(
filter.vote.is_none()
&& filter.failed.is_none()
&& filter.account_include.is_empty()
&& filter.account_exclude.is_empty(),
limit.any,
)?;
ConfigGrpcFilters::check_pubkey_max(
filter.account_include.len(),
limit.account_include_max,
)?;
ConfigGrpcFilters::check_pubkey_max(
filter.account_exclude.len(),
limit.account_exclude_max,
)?;
}
this.filters.insert(
name.clone(),
FilterTransactionsInner {
vote: filter.vote,
failed: filter.failed,
accounts_include: filter
.accounts_include
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<_, _>>()?,
accounts_exclude: filter
.accounts_exclude
.iter()
.map(|v| Pubkey::from_str(v))
.collect::<Result<_, _>>()?,
account_include: Filter::decode_pubkeys(
&filter.account_include,
limit.map(|v| &v.account_include_reject),
)?,
account_exclude: Filter::decode_pubkeys(&filter.account_exclude, None)?,
},
);
}
Ok(this)
}
}
impl FilterTransactions {
pub fn get_filters(
&self,
MessageTransaction { transaction, .. }: &MessageTransaction,
@ -279,24 +314,24 @@ impl FilterTransactions {
}
}
if !inner.accounts_include.is_empty()
if !inner.account_include.is_empty()
&& transaction
.transaction
.message()
.account_keys()
.iter()
.all(|pubkey| !inner.accounts_include.contains(pubkey))
.all(|pubkey| !inner.account_include.contains(pubkey))
{
return None;
}
if !inner.accounts_exclude.is_empty()
if !inner.account_exclude.is_empty()
&& transaction
.transaction
.message()
.account_keys()
.iter()
.any(|pubkey| inner.accounts_exclude.contains(pubkey))
.any(|pubkey| inner.account_exclude.contains(pubkey))
{
return None;
}
@ -312,12 +347,15 @@ struct FilterBlocks {
filters: Vec<String>,
}
impl TryFrom<&HashMap<String, SubscribeRequestFilterBlocks>> for FilterBlocks {
type Error = anyhow::Error;
fn try_from(
impl FilterBlocks {
fn new(
configs: &HashMap<String, SubscribeRequestFilterBlocks>,
) -> Result<Self, Self::Error> {
limit: Option<&ConfigGrpcFiltersBlocks>,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
Ok(FilterBlocks {
filters: configs
.iter()
@ -326,9 +364,7 @@ impl TryFrom<&HashMap<String, SubscribeRequestFilterBlocks>> for FilterBlocks {
.collect(),
})
}
}
impl FilterBlocks {
fn get_filters(&self, _message: &MessageBlock) -> Vec<String> {
self.filters.clone()
}

View File

@ -321,7 +321,7 @@ impl Geyser for GrpcService {
let id = self.subscribe_id.fetch_add(1, Ordering::SeqCst);
info!("{}, new subscriber", id);
let filter = match Filter::try_from(request.get_ref()) {
let filter = match Filter::new(request.get_ref(), self.config.filters.as_ref()) {
Ok(filter) => filter,
Err(error) => {
let message = format!("failed to create filter: {:?}", error);