use pubkey instead of string for account_id and program_id

This commit is contained in:
GroovieGermanikus 2023-09-05 15:49:10 +02:00
parent 7f2a0be3c9
commit 8ea8e8fa65
12 changed files with 291 additions and 30 deletions

103
Cargo.lock generated
View File

@ -307,6 +307,54 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "anstream"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c4c2c83f81532e5845a733998b6971faca23490340a418e9b72a3ec9de12ea"
[[package]]
name = "anstyle-parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "938874ff5980b03a87c5524b3ae5b59cf99b1d6bc836848df7bc5ada9643c333"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ca11d4be1bab0c8bc8734a9aa7bf4ee8316d462a08c6ac5052f888fef5b494b"
dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "anstyle-wincon"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd"
dependencies = [
"anstyle",
"windows-sys 0.48.0",
]
[[package]]
name = "anyhow"
version = "1.0.70"
@ -1075,7 +1123,7 @@ checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5"
dependencies = [
"atty",
"bitflags",
"clap_lex",
"clap_lex 0.2.4",
"indexmap",
"once_cell",
"strsim 0.10.0",
@ -1083,6 +1131,40 @@ dependencies = [
"textwrap 0.16.0",
]
[[package]]
name = "clap"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a13b88d2c62ff462f88e4a121f17a82c1af05693a2f192b5c38d14de73c19f6"
dependencies = [
"clap_builder",
"clap_derive",
]
[[package]]
name = "clap_builder"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bb9faaa7c2ef94b2743a21f5a29e6f0010dff4caa69ac8e9d6cf8b6fa74da08"
dependencies = [
"anstream",
"anstyle",
"clap_lex 0.5.1",
"strsim 0.10.0",
]
[[package]]
name = "clap_derive"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873"
dependencies = [
"heck 0.4.1",
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 2.0.15",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
@ -1092,6 +1174,12 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "clap_lex"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961"
[[package]]
name = "cloudabi"
version = "0.0.3"
@ -1111,6 +1199,12 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "colorchoice"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "combine"
version = "3.8.1"
@ -3064,6 +3158,7 @@ dependencies = [
"anyhow",
"async-channel",
"async-trait",
"clap 4.4.2",
"futures 0.3.28",
"itertools 0.10.5",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -7770,6 +7865,12 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -44,3 +44,5 @@ yellowstone-grpc-proto = "1.1.0"
[dev-dependencies]
solana-logger = "*"
clap = { version = "4.4.2", features = ["derive", "env"] }

View File

@ -0,0 +1,105 @@
#![allow(unused_variables)]
use mango_feeds_connector::{grpc_plugin_source, metrics, AccountWrite, FilterConfig, GrpcSourceConfig, MetricsConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, EntityFilter};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
///
/// test with local test-valiator (1.16.1, yellowstone-grpc v1.7.1+solana.1.16.1)
///
/// ```
/// RUST_LOG=info solana-test-validator --log --geyser-plugin-config /pathto/mango-feeds/connector/examples/config-yellowstone-grpc-testing.json
/// solana -ul transfer 2pvrKRRjCtCBUJVZcr6z9QbCPrXLhZRMCpXQYzJuhH9J 0.1
/// ```
///
#[tokio::main]
async fn main() -> anyhow::Result<()> {
solana_logger::setup_with_default(
"info,tokio_reactor=info,mango_feeds_connector::grpc_plugin_source=debug",
);
let metrics_tx = metrics::start(
MetricsConfig {
output_stdout: false,
output_http: false,
},
"example".to_string(),
);
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let config = SourceConfig {
// only used for geyser
dedup_queue_size: 100,
// only used for geyser
grpc_sources: vec![GrpcSourceConfig {
// used in metrics
name: "example-consumer".to_string(),
connection_string: "http://127.0.0.1:10000".to_string(),
token: None,
retry_connection_sleep_secs: 10,
tls: None,
}],
// used for websocket+geyser
snapshot: SnapshotSourceConfig {
rpc_http_url: "http://127.0.0.1:8899".to_string(),
},
// used only for websocket
rpc_ws_url: "ws://localhost:55555/".to_string(),
};
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
tokio::spawn(async move {
loop {
let next = slot_queue_receiver.recv().await.unwrap();
// println!("got slot: {:?}", next);
}
});
tokio::spawn(async move {
loop {
let next = account_write_queue_receiver.recv().await.unwrap();
println!("got account write: {:?}", next);
}
});
let filter_config1 = FilterConfig {
entity_filter: EntityFilter::filter_by_program_id("11111111111111111111111111111111"),
};
// an account that exists
let filter_config2 = FilterConfig {
entity_filter: EntityFilter::filter_by_account_ids(vec![
"2z5cFZAmL5HgDYXPAfEVpWn33Nixsu3iSsg5PDCFDWSb"
]),
};
// an account that does not exist
let filter_config3 = FilterConfig {
entity_filter: EntityFilter::filter_by_account_ids(vec![
"aorYUvexUBb6cRFpmauF3ofgUDDpFZcRpHpcp5B2Zip"
]),
};
let filter_config4 = FilterConfig {
entity_filter: EntityFilter::filter_by_account_ids(vec![]),
};
let filter_config = filter_config1;
grpc_plugin_source::process_events(
&config,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),
exit.clone(),
)
.await;
Ok(())
}

View File

@ -1,6 +1,5 @@
#![allow(unused_variables)]
use mango_feeds_connector::EntityFilter::FilterByAccountIds;
use mango_feeds_connector::{grpc_plugin_source, metrics, AccountWrite, FilterConfig, GrpcSourceConfig, MetricsConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, EntityFilter};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
@ -87,7 +86,7 @@ async fn main() -> anyhow::Result<()> {
};
let filter_config4 = FilterConfig {
entity_filter: FilterByAccountIds(vec![]),
entity_filter: EntityFilter::filter_by_account_ids(vec![]),
};
let filter_config = filter_config1;

View File

@ -0,0 +1,63 @@
#![allow(unused_variables)]
use clap::Parser;
use jsonrpc_core_client::transports::http;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::OptionalContext;
use solana_rpc::rpc::rpc_accounts::AccountsDataClient as GetProgramAccountsClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
// use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient as GetProgramAccountsClient;
/// this tool tests the differences between rpc_accounts and rpc_accounts_scan (should be same)
#[derive(Parser, Debug, Clone)]
#[clap()]
struct Cli {
// e.g. https://mango.devnet.rpcpool.com
#[clap(short, long, env)]
rpc_url: String,
// e.g. 4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg
#[clap(short, long, env)]
program_account: Pubkey,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
solana_logger::setup_with_default(
"info",
);
let cli = Cli::parse_from(std::env::args_os());
let rpc_http_url = cli.rpc_url;
let program_id = cli.program_account;
let rpc_client = http::connect::<GetProgramAccountsClient>(&rpc_http_url)
.await.unwrap();
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
let program_info_config = RpcProgramAccountsConfig {
filters: None,
account_config: account_info_config,
with_context: Some(true),
};
let snapshot = rpc_client.get_program_accounts(program_id.to_string(), Some(program_info_config)).await;
if let OptionalContext::Context(snapshot_data) = snapshot.unwrap() {
println!("api version: {:?}", snapshot_data.context.api_version);
println!("#accounts {:?}", snapshot_data.value.len());
// mainnet, mango accounts: #accounts 1971, 1.14.24
}
Ok(())
}

View File

@ -1,9 +1,6 @@
#![allow(unused_variables)]
use mango_feeds_connector::EntityFilter::{FilterByAccountIds, FilterByProgramId};
use mango_feeds_connector::{
websocket_source, AccountWrite, FilterConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig,
};
use mango_feeds_connector::{websocket_source, AccountWrite, FilterConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig, EntityFilter};
///
/// test with local test-valiator (1.16.1)
@ -34,25 +31,25 @@ async fn main() -> anyhow::Result<()> {
};
let filter_config1 = FilterConfig {
entity_filter: FilterByProgramId("11111111111111111111111111111111".to_string()),
entity_filter: EntityFilter::filter_by_program_id("11111111111111111111111111111111"),
};
// an account that exists
let filter_config2 = FilterConfig {
entity_filter: FilterByAccountIds(vec![
"2z5cFZAmL5HgDYXPAfEVpWn33Nixsu3iSsg5PDCFDWSb".to_string()
entity_filter: EntityFilter::filter_by_account_ids(vec![
"2z5cFZAmL5HgDYXPAfEVpWn33Nixsu3iSsg5PDCFDWSb"
]),
};
// an account that does not exis
let filter_config3 = FilterConfig {
entity_filter: FilterByAccountIds(vec![
"aorYUvexUBb6cRFpmauF3ofgUDDpFZcRpHpcp5B2Zip".to_string()
entity_filter: EntityFilter::filter_by_account_ids(vec![
"aorYUvexUBb6cRFpmauF3ofgUDDpFZcRpHpcp5B2Zip"
]),
};
let filter_config4 = FilterConfig {
entity_filter: FilterByAccountIds(vec![]),
entity_filter: EntityFilter::filter_by_account_ids(vec![]),
};
let filter_config = filter_config1;

View File

@ -108,7 +108,7 @@ async fn feed_data_geyser(
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: account_ids.into_iter().map(Pubkey::to_string).collect(),
account: account_ids.iter().map(Pubkey::to_string).collect(),
owner: vec![],
filters: vec![],
},
@ -211,7 +211,7 @@ async fn feed_data_geyser(
snapshot_needed = false;
match &filter_config.entity_filter {
EntityFilter::FilterByAccountIds(account_ids) => {
let account_ids_typed = account_ids.into_iter().map(Pubkey::to_string).collect();
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect();
snapshot_gma = tokio::spawn(get_snapshot_gma(snapshot_rpc_http_url.clone(), account_ids_typed)).fuse();
},
EntityFilter::FilterByProgramId(program_id) => {

View File

@ -48,7 +48,7 @@ async fn feed_data(
) -> anyhow::Result<()> {
match &filter_config.entity_filter {
EntityFilter::FilterByAccountIds(account_ids) => {
let account_ids_typed = account_ids.into_iter().map(Pubkey::to_string).collect();
let account_ids_typed = account_ids.iter().map(Pubkey::to_string).collect();
feed_data_by_accounts(config, account_ids_typed, sender).await
}
EntityFilter::FilterByProgramId(program_id) => {

View File

@ -22,6 +22,7 @@ use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use std::iter::FromIterator;
use mango_feeds_lib::EntityFilter::FilterByAccountIds;
use mango_feeds_lib::FilterConfig;
@ -154,7 +155,7 @@ async fn main() -> anyhow::Result<()> {
.collect();
let filter_config = FilterConfig {
entity_filter: FilterByAccountIds(all_queue_pks.iter().map(|pk| pk.to_string()).collect()),
entity_filter: FilterByAccountIds(Vec::from_iter(all_queue_pks)),
};
if use_geyser {
grpc_plugin_source::process_events(

View File

@ -12,12 +12,7 @@ use futures_util::{
pin_mut, SinkExt, StreamExt, TryStreamExt,
};
use log::*;
use mango_feeds_lib::{
grpc_plugin_source, metrics,
metrics::{MetricType, MetricU64},
websocket_source, FilterConfig, MarketConfig, MetricsConfig, PostgresConfig, SourceConfig,
StatusResponse,
};
use mango_feeds_lib::{grpc_plugin_source, metrics, metrics::{MetricType, MetricU64}, websocket_source, FilterConfig, MarketConfig, MetricsConfig, PostgresConfig, SourceConfig, StatusResponse, EntityFilter};
use mango_v4_client::{Client, MangoGroupContext, TransactionBuilderConfig};
use service_mango_fills::{Command, FillCheckpoint, FillEventFilterMessage, FillEventType};
use std::{
@ -40,7 +35,6 @@ use tokio::{
};
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use mango_feeds_lib::EntityFilter::FilterByAccountIds;
use serde::Deserialize;
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
@ -615,9 +609,9 @@ async fn main() -> anyhow::Result<()> {
);
let use_geyser = true;
let all_queue_pks = [perp_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1).collect();
let filter_config = FilterConfig {
entity_filter: FilterByAccountIds(relevant_pubkeys),
entity_filter: EntityFilter::FilterByAccountIds(relevant_pubkeys),
};
if use_geyser {
grpc_plugin_source::process_events(

View File

@ -587,7 +587,7 @@ async fn main() -> anyhow::Result<()> {
let relevant_pubkeys = [market_configs.clone(), serum_market_configs.clone()]
.concat()
.iter()
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])
.flat_map(|m| [m.1.bids, m.1.asks])
.collect_vec();
let filter_config = FilterConfig {
entity_filter: FilterByAccountIds(
@ -595,7 +595,7 @@ async fn main() -> anyhow::Result<()> {
relevant_pubkeys,
market_configs
.iter()
.map(|(_, mkt)| mkt.oracle.to_string())
.map(|(_, mkt)| mkt.oracle)
.collect_vec(),
]
.concat()

View File

@ -153,7 +153,6 @@ struct PnlResponseItem {
}
use jsonrpsee::http_server::HttpServerHandle;
use mango_feeds_lib::EntityFilter::FilterByProgramId;
fn start_jsonrpc_server(
config: JsonRpcConfig,
@ -303,7 +302,7 @@ async fn main() -> anyhow::Result<()> {
// start filling chain_data from the grpc plugin source
let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?;
let filter_config = FilterConfig {
entity_filter: FilterByProgramId("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into()),
entity_filter: EntityFilter::filter_by_program_id("4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg"),
};
grpc_plugin_source::process_events(
&config.source,