diff --git a/Cargo.lock b/Cargo.lock index b3550da..76cded2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2437,6 +2437,21 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "quic-geyser-block-builder" +version = "0.1.3" +dependencies = [ + "anyhow", + "bincode", + "itertools", + "log", + "quic-geyser-common", + "quic-geyser-server", + "rand 0.8.5", + "solana-sdk", + "tracing-subscriber", +] + [[package]] name = "quic-geyser-blocking-client" version = "0.1.3" @@ -2482,6 +2497,7 @@ name = "quic-geyser-common" version = "0.1.3" dependencies = [ "anyhow", + "bincode", "itertools", "log", "lz4", diff --git a/Cargo.toml b/Cargo.toml index 6f95bae..7cf2ea2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "examples/tester-client", "examples/tester-server", "proxy", + "block-builder", ] [workspace.package] @@ -62,6 +63,7 @@ quic-geyser-plugin = {path = "plugin", version="0.1.3"} quic-geyser-server = {path = "server", version="0.1.3"} quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"} quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"} +quic-geyser-block-builder = {path = "blocking_client", version = "0.1.3"} [profile.release] debug = true diff --git a/block-builder/Cargo.toml b/block-builder/Cargo.toml new file mode 100644 index 0000000..71855fc --- /dev/null +++ b/block-builder/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "quic-geyser-block-builder" +version = "0.1.3" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +solana-sdk = { workspace = true } +anyhow = { workspace = true } +log = { workspace = true } + +quic-geyser-common = { workspace = true } +bincode = { workspace = true } +itertools = { workspace = true } + +[dev-dependencies] +rand = { workspace = true } +tracing-subscriber = { workspace = true } +itertools = { workspace = true } +quic-geyser-server = { workspace = true } \ No newline at end of file diff --git a/block-builder/src/block_builder.rs b/block-builder/src/block_builder.rs new file mode 100644 index 0000000..c85c738 --- /dev/null +++ b/block-builder/src/block_builder.rs @@ -0,0 +1,152 @@ +use std::{collections::{BTreeMap, HashMap}, sync::mpsc::{Receiver, Sender}}; + +use itertools::Itertools; +use quic_geyser_common::{channel_message::{AccountData, ChannelMessage}, compression::CompressionType, message::Message, types::{account::Account, block::Block, block_meta::BlockMeta, slot_identifier::SlotIdentifier, transaction::Transaction}}; +use solana_sdk::pubkey::Pubkey; + +pub fn start_block_building_thread(channel_messages: Receiver, output: Sender, compression_type: CompressionType) { + std::thread::spawn(move || { + build_blocks(channel_messages, output, compression_type); + }); +} + +#[derive(Default)] +struct PartialBlock { + meta: Option, + transactions: Vec, + account_updates: HashMap, +} + +pub fn build_blocks(channel_messages: Receiver, output: Sender, compression_type: CompressionType) { + let mut partially_build_blocks = BTreeMap::::new(); + while let Ok(channel_message) = channel_messages.recv() { + match channel_message { + ChannelMessage::Account(account_data, slot) => { + if let Some(lowest) = partially_build_blocks.first_entry() { + if *lowest.key() > slot { + log::error!("Account update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot); + } + } + // save account updates + let partial_block = match partially_build_blocks.get_mut(&slot) { + Some(pb) => pb, + None => { + partially_build_blocks.insert(slot, PartialBlock::default()); + partially_build_blocks.get_mut(&slot).unwrap() + } + }; + let update = match partial_block.account_updates.get(&account_data.pubkey) { + Some(prev_update) => { + prev_update.write_version < account_data.write_version + }, + None => { + true + }, + }; + if update { + partial_block.account_updates.insert(account_data.pubkey, account_data); + } + }, + ChannelMessage::Slot(slot, _parent_slot, commitment) => { + if commitment.is_confirmed() || commitment.is_finalized() { + // dispactch partially build blocks if not already dispatched + dispatch_partial_block(&mut partially_build_blocks, slot, &output, compression_type); + } + }, + ChannelMessage::BlockMeta(meta) => { + let slot = meta.slot; + if let Some(lowest) = partially_build_blocks.first_entry() { + if *lowest.key() > meta.slot { + log::error!("Blockmeta update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), meta.slot); + } + } + // save account updates + let dispatch = { + let partial_block = match partially_build_blocks.get_mut(&meta.slot) { + Some(pb) => pb, + None => { + partially_build_blocks.insert(meta.slot, PartialBlock::default()); + partially_build_blocks.get_mut(&meta.slot).unwrap() + } + }; + let meta_tx_count = meta.executed_transaction_count; + if partial_block.meta.is_some() { + log::error!("Block meta has already been set"); + } else { + partial_block.meta = Some(meta); + } + meta_tx_count == partial_block.transactions.len() as u64 + }; + if dispatch { + dispatch_partial_block(&mut partially_build_blocks, slot, &output, compression_type); + } + }, + ChannelMessage::Transaction(transaction) => { + let slot = transaction.slot_identifier.slot; + if let Some(lowest) = partially_build_blocks.first_entry() { + if *lowest.key() > transaction.slot_identifier.slot { + log::error!("Transactions update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot); + } + } + // save account updates + let (transactions_length_in_pb, meta_transactions_count) = { + let partial_block = match partially_build_blocks.get_mut(&slot) { + Some(pb) => pb, + None => { + partially_build_blocks.insert(slot, PartialBlock::default()); + partially_build_blocks.get_mut(&slot).unwrap() + } + }; + partial_block.transactions.push(*transaction); + + let meta_transactions_count = partial_block.meta.as_ref().map(|x| x.executed_transaction_count); + + (partial_block.transactions.len() as u64, meta_transactions_count) + }; + + if Some(transactions_length_in_pb) == meta_transactions_count { + // check if all transactions are taken into account + dispatch_partial_block(&mut partially_build_blocks, slot, &output, compression_type); + } + }, + } + } +} + +fn dispatch_partial_block(partial_blocks: &mut BTreeMap::, slot: u64, output: &Sender, compression_type: CompressionType) { + if let Some(dispatched_partial_block) = partial_blocks.remove(&slot) { + let Some(meta) = dispatched_partial_block.meta else { + log::error!("Block was dispactched without any meta data/ cannot dispatch the block {slot}"); + return; + }; + let transactions = dispatched_partial_block.transactions; + if transactions.len() != meta.executed_transaction_count as usize { + log::error!("for block at slot {slot} transaction size mismatch {}!={}", transactions.len(), meta.executed_transaction_count); + } + let accounts = dispatched_partial_block.account_updates.iter().map(|(pubkey, account_data)| { + let data_length = account_data.account.data.len() as u64; + Account { + slot_identifier: SlotIdentifier { + slot, + }, + pubkey: *pubkey, + owner: account_data.account.owner, + lamports: account_data.account.lamports, + executable: account_data.account.executable, + rent_epoch: account_data.account.rent_epoch, + write_version: account_data.write_version, + data: account_data.account.data.clone(), + compression_type: CompressionType::None, + data_length, + } + }).collect_vec(); + match Block::build(meta, transactions, accounts, compression_type) { + Ok(block) => { + output.send(Message::Block(block)).unwrap(); + }, + Err(e) => { + log::error!("block building failed because of error: {e}") + } + } + } +} \ No newline at end of file diff --git a/block-builder/src/lib.rs b/block-builder/src/lib.rs new file mode 100644 index 0000000..3b601e8 --- /dev/null +++ b/block-builder/src/lib.rs @@ -0,0 +1,2 @@ +pub mod block_builder; +mod tests; \ No newline at end of file diff --git a/block-builder/src/tests.rs b/block-builder/src/tests.rs new file mode 100644 index 0000000..692ab23 --- /dev/null +++ b/block-builder/src/tests.rs @@ -0,0 +1,128 @@ +#[cfg(test)] +mod tests { + use std::{sync::mpsc::channel, vec}; + + use itertools::Itertools; + use quic_geyser_common::{channel_message::{AccountData, ChannelMessage}, types::{block_meta::BlockMeta, slot_identifier::SlotIdentifier, transaction::{Transaction, TransactionMeta}}}; + use solana_sdk::{account::Account, hash::Hash, message::{v0::{LoadedAddresses, Message}, MessageHeader}, pubkey::Pubkey, signature::Signature}; + + use crate::block_builder::start_block_building_thread; + + #[test] + fn test_block_creation() { + let (channelmsg_sx, cm_rx) = channel(); + let (ms_sx, msg_rx) = channel(); + start_block_building_thread(cm_rx, ms_sx, quic_geyser_common::compression::CompressionType::None); + + let acc1_pk = Pubkey::new_unique(); + let acc1 = ChannelMessage::Account( + AccountData { + pubkey: acc1_pk, + account: Account { + lamports: 12345, + data: (0..100).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 1, + }, + 5, + ); + let acc2 = ChannelMessage::Account( + AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 12345, + data: (0..100).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 1, + }, + 5, + ); + + let acc3 = ChannelMessage::Account( + AccountData { + pubkey: acc1_pk, + account: Account { + lamports: 12345, + data: (0..100).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 2, + }, + 5, + ); + + let acc4 = ChannelMessage::Account( + AccountData { + pubkey: acc1_pk, + account: Account { + lamports: 12345, + data: (0..100).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 0, + }, + 5, + ); + channelmsg_sx.send(acc1.clone()).unwrap(); + channelmsg_sx.send(acc2.clone()).unwrap(); + channelmsg_sx.send(acc3.clone()).unwrap(); + channelmsg_sx.send(acc4.clone()).unwrap(); + + let block_meta = BlockMeta { + parent_slot: 4, + slot: 5, + parent_blockhash: Hash::new_unique().to_string(), + blockhash: Hash::new_unique().to_string(), + rewards: vec![], + block_height: Some(4), + executed_transaction_count: 2, + entries_count: 2, + }; + channelmsg_sx.send(ChannelMessage::BlockMeta(block_meta)).unwrap(); + + let tx1 = Transaction { + slot_identifier: SlotIdentifier { + slot: 5 + }, + signatures: vec![Signature::new_unique()], + message: Message{ + header: MessageHeader { + num_required_signatures: 1, + num_readonly_signed_accounts: 0, + num_readonly_unsigned_accounts: 0, + }, + account_keys: vec![acc1_pk], + recent_blockhash: Hash::new_unique(), + instructions: vec![], + address_table_lookups: vec![], + }, + is_vote: false, + transasction_meta: TransactionMeta { + error: None, + fee: 0, + pre_balances: vec![], + post_balances: vec![], + inner_instructions: None, + log_messages: Some(vec!["toto".to_string()]), + rewards: None, + loaded_addresses: LoadedAddresses { + writable: vec![], + readonly: vec![] + }, + return_data: None, + compute_units_consumed: Some(1234), + }, + index: 0, + }; + } +} \ No newline at end of file diff --git a/blocking_client/src/configure_client.rs b/blocking_client/src/configure_client.rs index cb1832e..37e61fd 100644 --- a/blocking_client/src/configure_client.rs +++ b/blocking_client/src/configure_client.rs @@ -25,5 +25,6 @@ pub fn configure_client( config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2); config.set_max_ack_delay(maximum_ack_delay); config.set_ack_delay_exponent(ack_exponent); + config.enable_pacing(false); Ok(config) } diff --git a/blocking_client/src/quiche_client_loop.rs b/blocking_client/src/quiche_client_loop.rs index 3ee16e1..2ee7cd6 100644 --- a/blocking_client/src/quiche_client_loop.rs +++ b/blocking_client/src/quiche_client_loop.rs @@ -147,7 +147,7 @@ pub fn create_quiche_client_thread( let mut instance = Instant::now(); 'client: loop { - poll.poll(&mut events, Some(Duration::from_micros(100))) + poll.poll(&mut events, Some(Duration::from_millis(10))) .unwrap(); if events.is_empty() { connection.on_timeout(); @@ -303,7 +303,7 @@ mod tests { let message_1 = ChannelMessage::Slot( 3, 2, - solana_sdk::commitment_config::CommitmentLevel::Confirmed, + solana_sdk::commitment_config::CommitmentConfig::confirmed(), ); let message_2 = ChannelMessage::Account( AccountData { @@ -424,7 +424,7 @@ mod tests { Message::SlotMsg(SlotMeta { slot: 3, parent: 2, - commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed + commitment_config: solana_sdk::commitment_config::CommitmentConfig::confirmed() }) ); diff --git a/common/Cargo.toml b/common/Cargo.toml index f648819..be05b4b 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -13,6 +13,7 @@ log = { workspace = true } thiserror = {workspace = true} itertools = { workspace = true } lz4 = { workspace = true } +bincode = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/common/src/channel_message.rs b/common/src/channel_message.rs index cd93c9e..738fe03 100644 --- a/common/src/channel_message.rs +++ b/common/src/channel_message.rs @@ -1,5 +1,5 @@ use solana_sdk::{ - account::Account, clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey, + account::Account, clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, }; use crate::types::{block_meta::BlockMeta, transaction::Transaction}; @@ -14,7 +14,7 @@ pub struct AccountData { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ChannelMessage { Account(AccountData, Slot), - Slot(u64, u64, CommitmentLevel), + Slot(u64, u64, CommitmentConfig), BlockMeta(BlockMeta), Transaction(Box), } diff --git a/common/src/message.rs b/common/src/message.rs index 383cd18..d61859d 100644 --- a/common/src/message.rs +++ b/common/src/message.rs @@ -3,9 +3,7 @@ use serde::{Deserialize, Serialize}; use crate::{ filters::Filter, types::{ - account::Account, - block_meta::{BlockMeta, SlotMeta}, - transaction::Transaction, + account::Account, block::Block, block_meta::{BlockMeta, SlotMeta}, transaction::Transaction }, }; @@ -20,5 +18,5 @@ pub enum Message { BlockMetaMsg(BlockMeta), TransactionMsg(Box), Filters(Vec), // sent from client to server - Ping, + Block(Block), } diff --git a/common/src/types/block.rs b/common/src/types/block.rs new file mode 100644 index 0000000..1f54c5d --- /dev/null +++ b/common/src/types/block.rs @@ -0,0 +1,65 @@ +use serde::{Deserialize, Serialize}; + +use crate::compression::CompressionType; + +use super::{account::Account, block_meta::BlockMeta, transaction::Transaction}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +pub struct Block { + pub meta: BlockMeta, + pub transactions: Vec, // compressed transaction::Transaction + pub accounts_updated_in_block: Vec, // compressed account::Account + pub accounts_updated_count : u64, + pub compression_type: CompressionType, +} + +impl Block { + pub fn build(meta: BlockMeta, transactions: Vec, accounts: Vec, compression_type: CompressionType) -> anyhow::Result { + let accounts_count = accounts.len() as u64; + let transactions_binary = bincode::serialize(&transactions)?; + let transactions_compressed = compression_type.compress(&transactions_binary); + let accounts_binary = bincode::serialize(&accounts)?; + let accounts_compressed = compression_type.compress(&accounts_binary); + Ok( + Self { + meta, + transactions: transactions_compressed, + accounts_updated_in_block: accounts_compressed, + accounts_updated_count: accounts_count, + compression_type, + } + ) + } + + pub fn get_transactions(&self) -> anyhow::Result> { + let transactions = match self.compression_type { + CompressionType::None => { + bincode::deserialize::>(&self.transactions)? + }, + CompressionType::Lz4Fast(_) | CompressionType::Lz4(_) => { + let data = lz4::block::decompress(&self.transactions, None)?; + bincode::deserialize::>(&data)? + } + }; + if transactions.len() != self.meta.executed_transaction_count as usize { + log::error!("transactions vector size is not equal to expected size in meta {} != {}", transactions.len(), self.meta.executed_transaction_count); + } + Ok(transactions) + } + + pub fn get_accounts(&self) -> anyhow::Result> { + let accounts = match self.compression_type { + CompressionType::None => { + bincode::deserialize::>(&self.accounts_updated_in_block)? + }, + CompressionType::Lz4Fast(_) | CompressionType::Lz4(_) => { + let data = lz4::block::decompress(&self.accounts_updated_in_block, None)?; + bincode::deserialize::>(&data)? + } + }; + if accounts.len() != self.accounts_updated_count as usize { + log::error!("accounts vector size is not equal to expected {} != {}", accounts.len(), self.accounts_updated_count); + } + Ok(accounts) + } +} \ No newline at end of file diff --git a/common/src/types/block_meta.rs b/common/src/types/block_meta.rs index fcefd0b..f7610f4 100644 --- a/common/src/types/block_meta.rs +++ b/common/src/types/block_meta.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use solana_sdk::commitment_config::CommitmentLevel; +use solana_sdk::commitment_config::CommitmentConfig; use solana_transaction_status::Reward; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] @@ -7,7 +7,7 @@ use solana_transaction_status::Reward; pub struct SlotMeta { pub slot: u64, pub parent: u64, - pub commitment_level: CommitmentLevel, + pub commitment_config: CommitmentConfig, } #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] diff --git a/common/src/types/mod.rs b/common/src/types/mod.rs index 4558ff6..4c930db 100644 --- a/common/src/types/mod.rs +++ b/common/src/types/mod.rs @@ -3,3 +3,4 @@ pub mod block_meta; pub mod connections_parameters; pub mod slot_identifier; pub mod transaction; +pub mod block; \ No newline at end of file diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index 3b0f7ef..7059563 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -13,7 +13,7 @@ use quic_geyser_common::{ }; use quic_geyser_server::quic_server::QuicServer; use solana_sdk::{ - account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message, + account::Account, clock::Slot, commitment_config::CommitmentConfig, message::v0::Message, pubkey::Pubkey, }; @@ -103,9 +103,9 @@ impl GeyserPlugin for QuicGeyserPlugin { return Ok(()); }; let commitment_level = match status { - SlotStatus::Processed => CommitmentLevel::Processed, - SlotStatus::Rooted => CommitmentLevel::Finalized, - SlotStatus::Confirmed => CommitmentLevel::Confirmed, + SlotStatus::Processed => CommitmentConfig::processed(), + SlotStatus::Rooted => CommitmentConfig::finalized(), + SlotStatus::Confirmed => CommitmentConfig::confirmed(), }; let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level); quic_server diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 9176b6b..a9e24ef 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -94,7 +94,7 @@ pub fn main() -> anyhow::Result<()> { quic_geyser_common::message::Message::SlotMsg(slot_message) => ChannelMessage::Slot( slot_message.slot, slot_message.parent, - slot_message.commitment_level, + slot_message.commitment_config, ), quic_geyser_common::message::Message::BlockMetaMsg(block_meta_message) => { ChannelMessage::BlockMeta(BlockMeta { diff --git a/quiche/src/quiche_utils.rs b/quiche/src/quiche_utils.rs index 1e988f4..4f9acf3 100644 --- a/quiche/src/quiche_utils.rs +++ b/quiche/src/quiche_utils.rs @@ -75,7 +75,7 @@ pub fn get_next_unidi( break; } - if is_unidi(stream_id, is_server) && !is_bidi(stream_id) { + if is_unidi(stream_id, is_server) { return stream_id; } } diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index 703f95c..8c6c21e 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -98,7 +98,7 @@ pub fn server_loop( ); loop { - poll.poll(&mut events, Some(Duration::from_micros(100)))?; + poll.poll(&mut events, Some(Duration::from_millis(10)))?; 'read: loop { let (len, from) = match socket.recv_from(&mut buf) { Ok(v) => v, @@ -403,9 +403,6 @@ fn create_client_task( let mut filter_lk = filters.write().unwrap(); filter_lk.append(&mut f); } - Message::Ping => { - // got ping - } _ => { log::error!("unknown message from the client"); } @@ -556,11 +553,11 @@ fn create_dispatching_thread( (Message::AccountMsg(geyser_account), 4) } - ChannelMessage::Slot(slot, parent, commitment_level) => ( + ChannelMessage::Slot(slot, parent, commitment_config) => ( Message::SlotMsg(SlotMeta { slot, parent, - commitment_level, + commitment_config, }), 1, ),