token account
This commit is contained in:
parent
39199b842a
commit
2cef5ef87b
|
@ -6,6 +6,7 @@ use solana_sdk::commitment_config::CommitmentConfig;
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::pin::pin;
|
use std::pin::pin;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
use solana_account_decoder::parse_token::{parse_token, spl_token_ids, TokenAccountType};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
|
||||||
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
|
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
|
||||||
|
@ -17,65 +18,12 @@ use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
|
||||||
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate};
|
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeUpdate};
|
||||||
use yellowstone_grpc_proto::prost::Message as _;
|
use yellowstone_grpc_proto::prost::Message as _;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn start_example_blockmini_consumer(
|
|
||||||
multiplex_stream: impl Stream<Item = BlockMini> + Send + 'static,
|
|
||||||
) {
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut blockmeta_stream = pin!(multiplex_stream);
|
|
||||||
while let Some(mini) = blockmeta_stream.next().await {
|
|
||||||
info!(
|
|
||||||
"emitted block mini #{}@{} with {} bytes from multiplexer",
|
|
||||||
mini.slot, mini.commitment_config.commitment, mini.blocksize
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct BlockMini {
|
|
||||||
pub blocksize: usize,
|
|
||||||
pub slot: Slot,
|
|
||||||
pub commitment_config: CommitmentConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BlockMiniExtractor(CommitmentConfig);
|
|
||||||
|
|
||||||
impl FromYellowstoneExtractor for BlockMiniExtractor {
|
|
||||||
type Target = BlockMini;
|
|
||||||
fn map_yellowstone_update(&self, update: SubscribeUpdate) -> Option<(Slot, Self::Target)> {
|
|
||||||
match update.update_oneof {
|
|
||||||
Some(UpdateOneof::Block(update_block_message)) => {
|
|
||||||
let blocksize = update_block_message.encoded_len();
|
|
||||||
let slot = update_block_message.slot;
|
|
||||||
let mini = BlockMini {
|
|
||||||
blocksize,
|
|
||||||
slot,
|
|
||||||
commitment_config: self.0,
|
|
||||||
};
|
|
||||||
Some((slot, mini))
|
|
||||||
}
|
|
||||||
Some(UpdateOneof::BlockMeta(update_blockmeta_message)) => {
|
|
||||||
let blocksize = update_blockmeta_message.encoded_len();
|
|
||||||
let slot = update_blockmeta_message.slot;
|
|
||||||
let mini = BlockMini {
|
|
||||||
blocksize,
|
|
||||||
slot,
|
|
||||||
commitment_config: self.0,
|
|
||||||
};
|
|
||||||
Some((slot, mini))
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
pub async fn main() {
|
pub async fn main() {
|
||||||
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
|
// RUST_LOG=info,stream_blocks_mainnet=debug,geyser_grpc_connector=trace
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
// console_subscriber::init();
|
// console_subscriber::init();
|
||||||
|
|
||||||
let COMMITMENT_LEVEL = CommitmentConfig::processed();
|
|
||||||
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
|
let grpc_addr_green = env::var("GRPC_ADDR").expect("need grpc url for green");
|
||||||
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
|
let grpc_x_token_green = env::var("GRPC_X_TOKEN").ok();
|
||||||
|
|
||||||
|
@ -98,30 +46,47 @@ pub async fn main() {
|
||||||
|
|
||||||
let green_stream = create_geyser_reconnecting_stream(
|
let green_stream = create_geyser_reconnecting_stream(
|
||||||
config.clone(),
|
config.clone(),
|
||||||
// GeyserFilter(COMMITMENT_LEVEL).accounts(),
|
|
||||||
token_accounts(),
|
token_accounts(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let blue_stream = create_geyser_reconnecting_stream(
|
|
||||||
config.clone(),
|
|
||||||
GeyserFilter(COMMITMENT_LEVEL).blocks_and_txs(),
|
|
||||||
);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let token_pk = Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap();
|
|
||||||
let mut green_stream = pin!(green_stream);
|
let mut green_stream = pin!(green_stream);
|
||||||
while let Some(message) = green_stream.next().await {
|
while let Some(message) = green_stream.next().await {
|
||||||
match message {
|
match message {
|
||||||
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
||||||
match subscriber_update.update_oneof {
|
match subscriber_update.update_oneof {
|
||||||
Some(UpdateOneof::Account(update)) => {
|
Some(UpdateOneof::Account(update)) => {
|
||||||
let account_info = update.account.unwrap();
|
let account = update.account.unwrap();
|
||||||
let account_pk = Pubkey::try_from(account_info.pubkey).unwrap();
|
let account_pk = Pubkey::try_from(account.pubkey).unwrap();
|
||||||
|
|
||||||
// if account_pk == token_pk {
|
let account_type = parse_token(&account.data, Some(6)).unwrap();
|
||||||
info!("got account update (green)!!! {} - {:?} - {} bytes",
|
match account_type {
|
||||||
update.slot, account_pk, account_info.data.len());
|
TokenAccountType::Account(account_ui) => {
|
||||||
// }
|
// UiTokenAccount {
|
||||||
|
// mint: "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
|
||||||
|
// owner: "7XDMxfmzmL2Hqhh8ABp4Byc5UEsMWfQdWo6r5vEVQNb8",
|
||||||
|
// token_amount: UiTokenAmount {
|
||||||
|
// ui_amount: Some(0.0), decimals: 6, amount: "0", ui_amount_string: "0"
|
||||||
|
// },
|
||||||
|
// delegate: None,
|
||||||
|
// state: Initialized,
|
||||||
|
// is_native: false,
|
||||||
|
// rent_exempt_reserve: None,
|
||||||
|
// delegated_amount: None,
|
||||||
|
// close_authority: None,
|
||||||
|
// extensions: []
|
||||||
|
// }
|
||||||
|
info!("it's an Account: {:?}", account_ui);
|
||||||
|
}
|
||||||
|
TokenAccountType::Mint(mint) => {
|
||||||
|
// not interesting
|
||||||
|
}
|
||||||
|
TokenAccountType::Multisig(_) => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("got account update (green)!!! {} - {:?} - {} bytes",
|
||||||
|
update.slot, account_pk, account.data.len());
|
||||||
let bytes: [u8; 32] =
|
let bytes: [u8; 32] =
|
||||||
account_pk.to_bytes();
|
account_pk.to_bytes();
|
||||||
}
|
}
|
||||||
|
@ -136,52 +101,23 @@ pub async fn main() {
|
||||||
warn!("Stream aborted");
|
warn!("Stream aborted");
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut blue_stream = pin!(blue_stream);
|
|
||||||
let extractor = BlockMiniExtractor(COMMITMENT_LEVEL);
|
|
||||||
while let Some(message) = blue_stream.next().await {
|
|
||||||
match message {
|
|
||||||
Message::GeyserSubscribeUpdate(subscriber_update) => {
|
|
||||||
let mapped = extractor.map_yellowstone_update(*subscriber_update);
|
|
||||||
if let Some((slot, block_mini)) = mapped {
|
|
||||||
info!("got update (blue)!!! block: {} - {} bytes", slot, block_mini.blocksize);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Message::Connecting(attempt) => {
|
|
||||||
warn!("Connection attempt: {}", attempt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
warn!("Stream aborted");
|
|
||||||
});
|
|
||||||
|
|
||||||
// "infinite" sleep
|
// "infinite" sleep
|
||||||
sleep(Duration::from_secs(1800)).await;
|
sleep(Duration::from_secs(1800)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_block_update(update: SubscribeUpdate) -> Option<Slot> {
|
|
||||||
match update.update_oneof {
|
|
||||||
Some(UpdateOneof::Block(update_block_message)) => {
|
|
||||||
let slot = update_block_message.slot;
|
|
||||||
Some(slot)
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn token_accounts() -> SubscribeRequest {
|
pub fn token_accounts() -> SubscribeRequest {
|
||||||
let mut accounts_subs = HashMap::new();
|
let mut accounts_subs = HashMap::new();
|
||||||
accounts_subs.insert(
|
accounts_subs.insert(
|
||||||
"client".to_string(),
|
"client".to_string(),
|
||||||
SubscribeRequestFilterAccounts {
|
SubscribeRequestFilterAccounts {
|
||||||
account: vec![],
|
account: vec![],
|
||||||
owner: vec![
|
owner:
|
||||||
"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string(),
|
spl_token_ids().iter().map(|pubkey| pubkey.to_string()).collect(),
|
||||||
],
|
|
||||||
filters: vec![],
|
filters: vec![],
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
SubscribeRequest {
|
SubscribeRequest {
|
||||||
slots: HashMap::new(),
|
slots: HashMap::new(),
|
||||||
accounts: accounts_subs,
|
accounts: accounts_subs,
|
||||||
|
|
Loading…
Reference in New Issue