mango-feeds/connector/examples/geyser_example_consumer.rs

102 lines
3.1 KiB
Rust
Raw Normal View History

2023-08-25 06:04:09 -07:00
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
2023-08-26 11:52:06 -07:00
use mango_feeds_connector::{AccountWrite, FilterConfig, grpc_plugin_source, GrpcSourceConfig, metrics, MetricsConfig, SlotUpdate, SnapshotSourceConfig, SourceConfig};
use mango_feeds_connector::EntityFilter::{FilterByAccountIds, FilterByProgramId};
2023-08-24 13:59:43 -07:00
2023-08-25 01:38:55 -07:00
///
2023-08-26 11:52:06 -07:00
/// test with local test-valiator (1.16.1, yellowstone-grpc v1.7.1+solana.1.16.1)
2023-08-25 01:38:55 -07:00
///
/// ```
2023-08-25 06:04:09 -07:00
/// RUST_LOG=info solana-test-validator --log --geyser-plugin-config /pathto/mango-feeds/connector/examples/config-yellowstone-grpc-testing.json
2023-08-25 01:38:55 -07:00
/// solana -ul transfer 2pvrKRRjCtCBUJVZcr6z9QbCPrXLhZRMCpXQYzJuhH9J 0.1
/// ```
///
2023-08-24 13:59:43 -07:00
#[tokio::main]
async fn main() -> anyhow::Result<()> {
2023-08-25 06:04:09 -07:00
solana_logger::setup_with_default("info,tokio_reactor=info,mango_feeds_connector::grpc_plugin_source=debug");
let metrics_tx = metrics::start(MetricsConfig { output_stdout: false, output_http: false}, "example".to_string());
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
2023-08-24 13:59:43 -07:00
let config = SourceConfig {
// only used for geyser
2023-08-25 06:04:09 -07:00
dedup_queue_size: 100,
2023-08-24 13:59:43 -07:00
// only used for geyser
grpc_sources: vec![
GrpcSourceConfig {
// used in metrics
2023-08-25 05:03:24 -07:00
name: "example-consumer".to_string(),
2023-08-25 06:04:09 -07:00
connection_string: "http://127.0.0.1:10000".to_string(),
2023-08-24 13:59:43 -07:00
token: None,
retry_connection_sleep_secs: 10,
tls: None,
}
],
// used for websocket+geyser
snapshot: SnapshotSourceConfig { rpc_http_url: "http://127.0.0.1:8899".to_string() },
2023-08-25 01:38:55 -07:00
// used only for websocket
2023-08-25 06:04:09 -07:00
rpc_ws_url: "ws://localhost:55555/".to_string(),
2023-08-25 05:03:24 -07:00
};
2023-08-25 03:31:32 -07:00
2023-08-24 13:59:43 -07:00
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::unbounded::<AccountWrite>();
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
tokio::spawn(async move {
loop {
let next = slot_queue_receiver.recv().await.unwrap();
2023-08-26 11:52:06 -07:00
// println!("got slot: {:?}", next);
2023-08-24 13:59:43 -07:00
}
});
tokio::spawn(async move {
loop {
let next = account_write_queue_receiver.recv().await.unwrap();
println!("got account write: {:?}", next);
}
});
2023-08-25 06:04:09 -07:00
let filter_config1 = FilterConfig {
entity_filter: FilterByProgramId("11111111111111111111111111111111".to_string()),
};
2023-08-26 11:52:06 -07:00
// an account that exists
let filter_config2 = FilterConfig {
entity_filter: FilterByAccountIds(vec!["2z5cFZAmL5HgDYXPAfEVpWn33Nixsu3iSsg5PDCFDWSb".to_string()]),
};
// an account that does not exist
let filter_config3 = FilterConfig {
entity_filter: FilterByAccountIds(vec!["aorYUvexUBb6cRFpmauF3ofgUDDpFZcRpHpcp5B2Zip".to_string()]),
};
let filter_config4 = FilterConfig {
entity_filter: FilterByAccountIds(vec![]),
};
2023-08-28 15:55:22 -07:00
let filter_config = filter_config1;
2023-08-26 11:52:06 -07:00
2023-08-25 06:04:09 -07:00
grpc_plugin_source::process_events(
2023-08-24 13:59:43 -07:00
&config,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
2023-08-25 06:04:09 -07:00
metrics_tx.clone(),
exit.clone(),
2023-08-24 13:59:43 -07:00
)
.await;
Ok(())
}