diff --git a/Cargo.toml b/Cargo.toml index d05e344..e88c768 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" yellowstone-grpc-client = "1.11.0" yellowstone-grpc-proto = "1.11.0" +# required for CommitmentConfig solana-sdk = "~1.16.17" url = "2.5.0" diff --git a/examples/stream_blocks_mainnet.rs b/examples/stream_blocks_mainnet.rs index 6f0c285..e0e7665 100644 --- a/examples/stream_blocks_mainnet.rs +++ b/examples/stream_blocks_mainnet.rs @@ -5,7 +5,24 @@ use solana_sdk::commitment_config::CommitmentConfig; use std::env; use std::pin::pin; -use geyser_grpc_connector::experimental::mock_literpc_core::{map_produced_block, ProducedBlock}; + +use base64::Engine; +use itertools::Itertools; +use solana_sdk::borsh0_10::try_from_slice_unchecked; +/// This file mocks the core model of the RPC server. +use solana_sdk::compute_budget; +use solana_sdk::compute_budget::ComputeBudgetInstruction; +use solana_sdk::hash::Hash; +use solana_sdk::instruction::CompiledInstruction; +use solana_sdk::message::v0::MessageAddressTableLookup; +use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; +use solana_sdk::pubkey::Pubkey; + +use solana_sdk::signature::Signature; +use solana_sdk::transaction::TransactionError; +use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; + + use geyser_grpc_connector::grpc_subscription_autoreconnect::{ create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, }; @@ -174,3 +191,210 @@ pub async fn main() { // "infinite" sleep sleep(Duration::from_secs(1800)).await; } + + +#[derive(Default, Debug, Clone)] +pub struct ProducedBlock { + pub transactions: Vec, + // pub leader_id: Option, + pub blockhash: String, + pub block_height: u64, + pub slot: Slot, + pub parent_slot: Slot, + pub block_time: u64, + pub commitment_config: CommitmentConfig, + pub previous_blockhash: String, + // pub rewards: Option>, +} + +#[derive(Debug, Clone)] +pub struct TransactionInfo { + pub signature: String, + pub err: Option, + pub cu_requested: Option, + pub prioritization_fees: Option, + pub cu_consumed: Option, + pub recent_blockhash: String, + pub message: String, +} + +pub fn map_produced_block( + block: SubscribeUpdateBlock, + commitment_config: CommitmentConfig, +) -> ProducedBlock { + let txs: Vec = block + .transactions + .into_iter() + .filter_map(|tx| { + let Some(meta) = tx.meta else { + return None; + }; + + let Some(transaction) = tx.transaction else { + return None; + }; + + let Some(message) = transaction.message else { + return None; + }; + + let Some(header) = message.header else { + return None; + }; + + let signatures = transaction + .signatures + .into_iter() + .filter_map(|sig| match Signature::try_from(sig) { + Ok(sig) => Some(sig), + Err(_) => { + log::warn!( + "Failed to read signature from transaction in block {} - skipping", + block.blockhash + ); + None + } + }) + .collect_vec(); + + let err = meta.err.map(|x| { + bincode::deserialize::(&x.err) + .expect("TransactionError should be deserialized") + }); + + let signature = signatures[0]; + let compute_units_consumed = meta.compute_units_consumed; + + let message = VersionedMessage::V0(v0::Message { + header: MessageHeader { + num_required_signatures: header.num_required_signatures as u8, + num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, + num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, + }, + account_keys: message + .account_keys + .into_iter() + .map(|key| { + let bytes: [u8; 32] = + key.try_into().unwrap_or(Pubkey::default().to_bytes()); + Pubkey::new_from_array(bytes) + }) + .collect(), + recent_blockhash: Hash::new(&message.recent_blockhash), + instructions: message + .instructions + .into_iter() + .map(|ix| CompiledInstruction { + program_id_index: ix.program_id_index as u8, + accounts: ix.accounts, + data: ix.data, + }) + .collect(), + address_table_lookups: message + .address_table_lookups + .into_iter() + .map(|table| { + let bytes: [u8; 32] = table + .account_key + .try_into() + .unwrap_or(Pubkey::default().to_bytes()); + MessageAddressTableLookup { + account_key: Pubkey::new_from_array(bytes), + writable_indexes: table.writable_indexes, + readonly_indexes: table.readonly_indexes, + } + }) + .collect(), + }); + + let legacy_compute_budget: Option<(u32, Option)> = + message.instructions().iter().find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { + units, + additional_fee, + }) = try_from_slice_unchecked(i.data.as_slice()) + { + if additional_fee > 0 { + return Some(( + units, + Some(((units * 1000) / additional_fee) as u64), + )); + } else { + return Some((units, None)); + } + } + } + None + }); + + let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); + let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); + + let cu_requested = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(limit); + } + } + None + }) + .or(legacy_cu_requested); + + let prioritization_fees = message + .instructions() + .iter() + .find_map(|i| { + if i.program_id(message.static_account_keys()) + .eq(&compute_budget::id()) + { + if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = + try_from_slice_unchecked(i.data.as_slice()) + { + return Some(price); + } + } + + None + }) + .or(legacy_prioritization_fees); + + Some(TransactionInfo { + signature: signature.to_string(), + err, + cu_requested, + prioritization_fees, + cu_consumed: compute_units_consumed, + recent_blockhash: message.recent_blockhash().to_string(), + message: base64::engine::general_purpose::STANDARD.encode(message.serialize()), + }) + }) + .collect(); + + // removed rewards + + ProducedBlock { + transactions: txs, + block_height: block + .block_height + .map(|block_height| block_height.block_height) + .unwrap(), + block_time: block.block_time.map(|time| time.timestamp).unwrap() as u64, + blockhash: block.blockhash, + previous_blockhash: block.parent_blockhash, + commitment_config, + // leader_id, + parent_slot: block.parent_slot, + slot: block.slot, + // rewards, + } +} diff --git a/src/experimental/mock_literpc_core.rs b/src/experimental/mock_literpc_core.rs deleted file mode 100644 index 4afabe9..0000000 --- a/src/experimental/mock_literpc_core.rs +++ /dev/null @@ -1,223 +0,0 @@ -use base64::Engine; -use itertools::Itertools; -use solana_sdk::borsh0_10::try_from_slice_unchecked; -/// This file mocks the core model of the RPC server. -use solana_sdk::clock::Slot; -use solana_sdk::commitment_config::CommitmentConfig; -use solana_sdk::compute_budget; -use solana_sdk::compute_budget::ComputeBudgetInstruction; -use solana_sdk::hash::Hash; -use solana_sdk::instruction::CompiledInstruction; -use solana_sdk::message::v0::MessageAddressTableLookup; -use solana_sdk::message::{v0, MessageHeader, VersionedMessage}; -use solana_sdk::pubkey::Pubkey; - -use solana_sdk::signature::Signature; -use solana_sdk::transaction::TransactionError; -use yellowstone_grpc_proto::geyser::SubscribeUpdateBlock; - -#[derive(Default, Debug, Clone)] -pub struct ProducedBlock { - pub transactions: Vec, - // pub leader_id: Option, - pub blockhash: String, - pub block_height: u64, - pub slot: Slot, - pub parent_slot: Slot, - pub block_time: u64, - pub commitment_config: CommitmentConfig, - pub previous_blockhash: String, - // pub rewards: Option>, -} - -#[derive(Debug, Clone)] -pub struct TransactionInfo { - pub signature: String, - pub err: Option, - pub cu_requested: Option, - pub prioritization_fees: Option, - pub cu_consumed: Option, - pub recent_blockhash: String, - pub message: String, -} - -pub fn map_produced_block( - block: SubscribeUpdateBlock, - commitment_config: CommitmentConfig, -) -> ProducedBlock { - let txs: Vec = block - .transactions - .into_iter() - .filter_map(|tx| { - let Some(meta) = tx.meta else { - return None; - }; - - let Some(transaction) = tx.transaction else { - return None; - }; - - let Some(message) = transaction.message else { - return None; - }; - - let Some(header) = message.header else { - return None; - }; - - let signatures = transaction - .signatures - .into_iter() - .filter_map(|sig| match Signature::try_from(sig) { - Ok(sig) => Some(sig), - Err(_) => { - log::warn!( - "Failed to read signature from transaction in block {} - skipping", - block.blockhash - ); - None - } - }) - .collect_vec(); - - let err = meta.err.map(|x| { - bincode::deserialize::(&x.err) - .expect("TransactionError should be deserialized") - }); - - let signature = signatures[0]; - let compute_units_consumed = meta.compute_units_consumed; - - let message = VersionedMessage::V0(v0::Message { - header: MessageHeader { - num_required_signatures: header.num_required_signatures as u8, - num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, - num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as u8, - }, - account_keys: message - .account_keys - .into_iter() - .map(|key| { - let bytes: [u8; 32] = - key.try_into().unwrap_or(Pubkey::default().to_bytes()); - Pubkey::new_from_array(bytes) - }) - .collect(), - recent_blockhash: Hash::new(&message.recent_blockhash), - instructions: message - .instructions - .into_iter() - .map(|ix| CompiledInstruction { - program_id_index: ix.program_id_index as u8, - accounts: ix.accounts, - data: ix.data, - }) - .collect(), - address_table_lookups: message - .address_table_lookups - .into_iter() - .map(|table| { - let bytes: [u8; 32] = table - .account_key - .try_into() - .unwrap_or(Pubkey::default().to_bytes()); - MessageAddressTableLookup { - account_key: Pubkey::new_from_array(bytes), - writable_indexes: table.writable_indexes, - readonly_indexes: table.readonly_indexes, - } - }) - .collect(), - }); - - let legacy_compute_budget: Option<(u32, Option)> = - message.instructions().iter().find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::RequestUnitsDeprecated { - units, - additional_fee, - }) = try_from_slice_unchecked(i.data.as_slice()) - { - if additional_fee > 0 { - return Some(( - units, - Some(((units * 1000) / additional_fee) as u64), - )); - } else { - return Some((units, None)); - } - } - } - None - }); - - let legacy_cu_requested = legacy_compute_budget.map(|x| x.0); - let legacy_prioritization_fees = legacy_compute_budget.map(|x| x.1).unwrap_or(None); - - let cu_requested = message - .instructions() - .iter() - .find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitLimit(limit)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(limit); - } - } - None - }) - .or(legacy_cu_requested); - - let prioritization_fees = message - .instructions() - .iter() - .find_map(|i| { - if i.program_id(message.static_account_keys()) - .eq(&compute_budget::id()) - { - if let Ok(ComputeBudgetInstruction::SetComputeUnitPrice(price)) = - try_from_slice_unchecked(i.data.as_slice()) - { - return Some(price); - } - } - - None - }) - .or(legacy_prioritization_fees); - - Some(TransactionInfo { - signature: signature.to_string(), - err, - cu_requested, - prioritization_fees, - cu_consumed: compute_units_consumed, - recent_blockhash: message.recent_blockhash().to_string(), - message: base64::engine::general_purpose::STANDARD.encode(message.serialize()), - }) - }) - .collect(); - - // removed rewards - - ProducedBlock { - transactions: txs, - block_height: block - .block_height - .map(|block_height| block_height.block_height) - .unwrap(), - block_time: block.block_time.map(|time| time.timestamp).unwrap() as u64, - blockhash: block.blockhash, - previous_blockhash: block.parent_blockhash, - commitment_config, - // leader_id, - parent_slot: block.parent_slot, - slot: block.slot, - // rewards, - } -} diff --git a/src/experimental/mod.rs b/src/experimental/mod.rs deleted file mode 100644 index cda3fb8..0000000 --- a/src/experimental/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod mock_literpc_core; diff --git a/src/lib.rs b/src/lib.rs index b879b7c..91794cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -pub mod experimental; pub mod grpc_subscription; pub mod grpc_subscription_autoreconnect; pub mod grpcmultiplex_fastestwins;