Creating a block builder task

This commit is contained in:
godmodegalactus 2024-06-03 16:47:09 +02:00
parent e3d1a3fa0b
commit 92ae7ab4cd
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
18 changed files with 408 additions and 23 deletions

16
Cargo.lock generated
View File

@ -2437,6 +2437,21 @@ dependencies = [
"syn 2.0.65",
]
[[package]]
name = "quic-geyser-block-builder"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"quic-geyser-common",
"quic-geyser-server",
"rand 0.8.5",
"solana-sdk",
"tracing-subscriber",
]
[[package]]
name = "quic-geyser-blocking-client"
version = "0.1.3"
@ -2482,6 +2497,7 @@ name = "quic-geyser-common"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"lz4",

View File

@ -10,6 +10,7 @@ members = [
"examples/tester-client",
"examples/tester-server",
"proxy",
"block-builder",
]
[workspace.package]
@ -62,6 +63,7 @@ quic-geyser-plugin = {path = "plugin", version="0.1.3"}
quic-geyser-server = {path = "server", version="0.1.3"}
quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"}
quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"}
quic-geyser-block-builder = {path = "blocking_client", version = "0.1.3"}
[profile.release]
debug = true

22
block-builder/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "quic-geyser-block-builder"
version = "0.1.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-sdk = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
quic-geyser-common = { workspace = true }
bincode = { workspace = true }
itertools = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -0,0 +1,152 @@
use std::{collections::{BTreeMap, HashMap}, sync::mpsc::{Receiver, Sender}};
use itertools::Itertools;
use quic_geyser_common::{channel_message::{AccountData, ChannelMessage}, compression::CompressionType, message::Message, types::{account::Account, block::Block, block_meta::BlockMeta, slot_identifier::SlotIdentifier, transaction::Transaction}};
use solana_sdk::pubkey::Pubkey;
pub fn start_block_building_thread(channel_messages: Receiver<ChannelMessage>, output: Sender<Message>, compression_type: CompressionType) {
std::thread::spawn(move || {
build_blocks(channel_messages, output, compression_type);
});
}
#[derive(Default)]
struct PartialBlock {
meta: Option<BlockMeta>,
transactions: Vec<Transaction>,
account_updates: HashMap<Pubkey, AccountData>,
}
pub fn build_blocks(channel_messages: Receiver<ChannelMessage>, output: Sender<Message>, compression_type: CompressionType) {
let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new();
while let Ok(channel_message) = channel_messages.recv() {
match channel_message {
ChannelMessage::Account(account_data, slot) => {
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > slot {
log::error!("Account update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot);
}
}
// save account updates
let partial_block = match partially_build_blocks.get_mut(&slot) {
Some(pb) => pb,
None => {
partially_build_blocks.insert(slot, PartialBlock::default());
partially_build_blocks.get_mut(&slot).unwrap()
}
};
let update = match partial_block.account_updates.get(&account_data.pubkey) {
Some(prev_update) => {
prev_update.write_version < account_data.write_version
},
None => {
true
},
};
if update {
partial_block.account_updates.insert(account_data.pubkey, account_data);
}
},
ChannelMessage::Slot(slot, _parent_slot, commitment) => {
if commitment.is_confirmed() || commitment.is_finalized() {
// dispactch partially build blocks if not already dispatched
dispatch_partial_block(&mut partially_build_blocks, slot, &output, compression_type);
}
},
ChannelMessage::BlockMeta(meta) => {
let slot = meta.slot;
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > meta.slot {
log::error!("Blockmeta update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), meta.slot);
}
}
// save account updates
let dispatch = {
let partial_block = match partially_build_blocks.get_mut(&meta.slot) {
Some(pb) => pb,
None => {
partially_build_blocks.insert(meta.slot, PartialBlock::default());
partially_build_blocks.get_mut(&meta.slot).unwrap()
}
};
let meta_tx_count = meta.executed_transaction_count;
if partial_block.meta.is_some() {
log::error!("Block meta has already been set");
} else {
partial_block.meta = Some(meta);
}
meta_tx_count == partial_block.transactions.len() as u64
};
if dispatch {
dispatch_partial_block(&mut partially_build_blocks, slot, &output, compression_type);
}
},
ChannelMessage::Transaction(transaction) => {
let slot = transaction.slot_identifier.slot;
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > transaction.slot_identifier.slot {
log::error!("Transactions update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot);
}
}
// save account updates
let (transactions_length_in_pb, meta_transactions_count) = {
let partial_block = match partially_build_blocks.get_mut(&slot) {
Some(pb) => pb,
None => {
partially_build_blocks.insert(slot, PartialBlock::default());
partially_build_blocks.get_mut(&slot).unwrap()
}
};
partial_block.transactions.push(*transaction);
let meta_transactions_count = partial_block.meta.as_ref().map(|x| x.executed_transaction_count);
(partial_block.transactions.len() as u64, meta_transactions_count)
};
if Some(transactions_length_in_pb) == meta_transactions_count {
// check if all transactions are taken into account
dispatch_partial_block(&mut partially_build_blocks, slot, &output, compression_type);
}
},
}
}
}
fn dispatch_partial_block(partial_blocks: &mut BTreeMap::<u64, PartialBlock>, slot: u64, output: &Sender<Message>, compression_type: CompressionType) {
if let Some(dispatched_partial_block) = partial_blocks.remove(&slot) {
let Some(meta) = dispatched_partial_block.meta else {
log::error!("Block was dispactched without any meta data/ cannot dispatch the block {slot}");
return;
};
let transactions = dispatched_partial_block.transactions;
if transactions.len() != meta.executed_transaction_count as usize {
log::error!("for block at slot {slot} transaction size mismatch {}!={}", transactions.len(), meta.executed_transaction_count);
}
let accounts = dispatched_partial_block.account_updates.iter().map(|(pubkey, account_data)| {
let data_length = account_data.account.data.len() as u64;
Account {
slot_identifier: SlotIdentifier {
slot,
},
pubkey: *pubkey,
owner: account_data.account.owner,
lamports: account_data.account.lamports,
executable: account_data.account.executable,
rent_epoch: account_data.account.rent_epoch,
write_version: account_data.write_version,
data: account_data.account.data.clone(),
compression_type: CompressionType::None,
data_length,
}
}).collect_vec();
match Block::build(meta, transactions, accounts, compression_type) {
Ok(block) => {
output.send(Message::Block(block)).unwrap();
},
Err(e) => {
log::error!("block building failed because of error: {e}")
}
}
}
}

2
block-builder/src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod block_builder;
mod tests;

128
block-builder/src/tests.rs Normal file
View File

@ -0,0 +1,128 @@
#[cfg(test)]
mod tests {
use std::{sync::mpsc::channel, vec};
use itertools::Itertools;
use quic_geyser_common::{channel_message::{AccountData, ChannelMessage}, types::{block_meta::BlockMeta, slot_identifier::SlotIdentifier, transaction::{Transaction, TransactionMeta}}};
use solana_sdk::{account::Account, hash::Hash, message::{v0::{LoadedAddresses, Message}, MessageHeader}, pubkey::Pubkey, signature::Signature};
use crate::block_builder::start_block_building_thread;
#[test]
fn test_block_creation() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
start_block_building_thread(cm_rx, ms_sx, quic_geyser_common::compression::CompressionType::None);
let acc1_pk = Pubkey::new_unique();
let acc1 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc2 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc3 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 2,
},
5,
);
let acc4 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 0,
},
5,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
channelmsg_sx.send(acc3.clone()).unwrap();
channelmsg_sx.send(acc4.clone()).unwrap();
let block_meta = BlockMeta {
parent_slot: 4,
slot: 5,
parent_blockhash: Hash::new_unique().to_string(),
blockhash: Hash::new_unique().to_string(),
rewards: vec![],
block_height: Some(4),
executed_transaction_count: 2,
entries_count: 2,
};
channelmsg_sx.send(ChannelMessage::BlockMeta(block_meta)).unwrap();
let tx1 = Transaction {
slot_identifier: SlotIdentifier {
slot: 5
},
signatures: vec![Signature::new_unique()],
message: Message{
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![]
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
}
}

View File

@ -25,5 +25,6 @@ pub fn configure_client(
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
config.set_max_ack_delay(maximum_ack_delay);
config.set_ack_delay_exponent(ack_exponent);
config.enable_pacing(false);
Ok(config)
}

View File

@ -147,7 +147,7 @@ pub fn create_quiche_client_thread(
let mut instance = Instant::now();
'client: loop {
poll.poll(&mut events, Some(Duration::from_micros(100)))
poll.poll(&mut events, Some(Duration::from_millis(10)))
.unwrap();
if events.is_empty() {
connection.on_timeout();
@ -303,7 +303,7 @@ mod tests {
let message_1 = ChannelMessage::Slot(
3,
2,
solana_sdk::commitment_config::CommitmentLevel::Confirmed,
solana_sdk::commitment_config::CommitmentConfig::confirmed(),
);
let message_2 = ChannelMessage::Account(
AccountData {
@ -424,7 +424,7 @@ mod tests {
Message::SlotMsg(SlotMeta {
slot: 3,
parent: 2,
commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed
commitment_config: solana_sdk::commitment_config::CommitmentConfig::confirmed()
})
);

View File

@ -13,6 +13,7 @@ log = { workspace = true }
thiserror = {workspace = true}
itertools = { workspace = true }
lz4 = { workspace = true }
bincode = { workspace = true }
[dev-dependencies]
rand = { workspace = true }

View File

@ -1,5 +1,5 @@
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey,
account::Account, clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey,
};
use crate::types::{block_meta::BlockMeta, transaction::Transaction};
@ -14,7 +14,7 @@ pub struct AccountData {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelMessage {
Account(AccountData, Slot),
Slot(u64, u64, CommitmentLevel),
Slot(u64, u64, CommitmentConfig),
BlockMeta(BlockMeta),
Transaction(Box<Transaction>),
}

View File

@ -3,9 +3,7 @@ use serde::{Deserialize, Serialize};
use crate::{
filters::Filter,
types::{
account::Account,
block_meta::{BlockMeta, SlotMeta},
transaction::Transaction,
account::Account, block::Block, block_meta::{BlockMeta, SlotMeta}, transaction::Transaction
},
};
@ -20,5 +18,5 @@ pub enum Message {
BlockMetaMsg(BlockMeta),
TransactionMsg(Box<Transaction>),
Filters(Vec<Filter>), // sent from client to server
Ping,
Block(Block),
}

65
common/src/types/block.rs Normal file
View File

@ -0,0 +1,65 @@
use serde::{Deserialize, Serialize};
use crate::compression::CompressionType;
use super::{account::Account, block_meta::BlockMeta, transaction::Transaction};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Block {
pub meta: BlockMeta,
pub transactions: Vec<u8>, // compressed transaction::Transaction
pub accounts_updated_in_block: Vec<u8>, // compressed account::Account
pub accounts_updated_count : u64,
pub compression_type: CompressionType,
}
impl Block {
pub fn build(meta: BlockMeta, transactions: Vec<Transaction>, accounts: Vec<Account>, compression_type: CompressionType) -> anyhow::Result<Self> {
let accounts_count = accounts.len() as u64;
let transactions_binary = bincode::serialize(&transactions)?;
let transactions_compressed = compression_type.compress(&transactions_binary);
let accounts_binary = bincode::serialize(&accounts)?;
let accounts_compressed = compression_type.compress(&accounts_binary);
Ok(
Self {
meta,
transactions: transactions_compressed,
accounts_updated_in_block: accounts_compressed,
accounts_updated_count: accounts_count,
compression_type,
}
)
}
pub fn get_transactions(&self) -> anyhow::Result<Vec<Transaction>> {
let transactions = match self.compression_type {
CompressionType::None => {
bincode::deserialize::<Vec<Transaction>>(&self.transactions)?
},
CompressionType::Lz4Fast(_) | CompressionType::Lz4(_) => {
let data = lz4::block::decompress(&self.transactions, None)?;
bincode::deserialize::<Vec<Transaction>>(&data)?
}
};
if transactions.len() != self.meta.executed_transaction_count as usize {
log::error!("transactions vector size is not equal to expected size in meta {} != {}", transactions.len(), self.meta.executed_transaction_count);
}
Ok(transactions)
}
pub fn get_accounts(&self) -> anyhow::Result<Vec<Account>> {
let accounts = match self.compression_type {
CompressionType::None => {
bincode::deserialize::<Vec<Account>>(&self.accounts_updated_in_block)?
},
CompressionType::Lz4Fast(_) | CompressionType::Lz4(_) => {
let data = lz4::block::decompress(&self.accounts_updated_in_block, None)?;
bincode::deserialize::<Vec<Account>>(&data)?
}
};
if accounts.len() != self.accounts_updated_count as usize {
log::error!("accounts vector size is not equal to expected {} != {}", accounts.len(), self.accounts_updated_count);
}
Ok(accounts)
}
}

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use solana_sdk::commitment_config::CommitmentLevel;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::Reward;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
@ -7,7 +7,7 @@ use solana_transaction_status::Reward;
pub struct SlotMeta {
pub slot: u64,
pub parent: u64,
pub commitment_level: CommitmentLevel,
pub commitment_config: CommitmentConfig,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]

View File

@ -3,3 +3,4 @@ pub mod block_meta;
pub mod connections_parameters;
pub mod slot_identifier;
pub mod transaction;
pub mod block;

View File

@ -13,7 +13,7 @@ use quic_geyser_common::{
};
use quic_geyser_server::quic_server::QuicServer;
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message,
account::Account, clock::Slot, commitment_config::CommitmentConfig, message::v0::Message,
pubkey::Pubkey,
};
@ -103,9 +103,9 @@ impl GeyserPlugin for QuicGeyserPlugin {
return Ok(());
};
let commitment_level = match status {
SlotStatus::Processed => CommitmentLevel::Processed,
SlotStatus::Rooted => CommitmentLevel::Finalized,
SlotStatus::Confirmed => CommitmentLevel::Confirmed,
SlotStatus::Processed => CommitmentConfig::processed(),
SlotStatus::Rooted => CommitmentConfig::finalized(),
SlotStatus::Confirmed => CommitmentConfig::confirmed(),
};
let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level);
quic_server

View File

@ -94,7 +94,7 @@ pub fn main() -> anyhow::Result<()> {
quic_geyser_common::message::Message::SlotMsg(slot_message) => ChannelMessage::Slot(
slot_message.slot,
slot_message.parent,
slot_message.commitment_level,
slot_message.commitment_config,
),
quic_geyser_common::message::Message::BlockMetaMsg(block_meta_message) => {
ChannelMessage::BlockMeta(BlockMeta {

View File

@ -75,7 +75,7 @@ pub fn get_next_unidi(
break;
}
if is_unidi(stream_id, is_server) && !is_bidi(stream_id) {
if is_unidi(stream_id, is_server) {
return stream_id;
}
}

View File

@ -98,7 +98,7 @@ pub fn server_loop(
);
loop {
poll.poll(&mut events, Some(Duration::from_micros(100)))?;
poll.poll(&mut events, Some(Duration::from_millis(10)))?;
'read: loop {
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
@ -403,9 +403,6 @@ fn create_client_task(
let mut filter_lk = filters.write().unwrap();
filter_lk.append(&mut f);
}
Message::Ping => {
// got ping
}
_ => {
log::error!("unknown message from the client");
}
@ -556,11 +553,11 @@ fn create_dispatching_thread(
(Message::AccountMsg(geyser_account), 4)
}
ChannelMessage::Slot(slot, parent, commitment_level) => (
ChannelMessage::Slot(slot, parent, commitment_config) => (
Message::SlotMsg(SlotMeta {
slot,
parent,
commitment_level,
commitment_config,
}),
1,
),