Merge pull request #3 from blockworks-foundation/creating_block_builder_task

Creating block builder task
This commit is contained in:
galactus 2024-06-05 17:54:25 +02:00 committed by GitHub
commit 669f85cbd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 1418 additions and 95 deletions

17
Cargo.lock generated
View File

@ -2437,6 +2437,21 @@ dependencies = [
"syn 2.0.65",
]
[[package]]
name = "quic-geyser-block-builder"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"quic-geyser-common",
"quic-geyser-server",
"rand 0.8.5",
"solana-sdk",
"tracing-subscriber",
]
[[package]]
name = "quic-geyser-blocking-client"
version = "0.1.3"
@ -2482,6 +2497,7 @@ name = "quic-geyser-common"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"lz4",
@ -2503,6 +2519,7 @@ dependencies = [
"clap",
"git-version",
"log",
"quic-geyser-block-builder",
"quic-geyser-common",
"quic-geyser-server",
"serde",

View File

@ -10,6 +10,7 @@ members = [
"examples/tester-client",
"examples/tester-server",
"proxy",
"block-builder",
]
[workspace.package]
@ -62,6 +63,7 @@ quic-geyser-plugin = {path = "plugin", version="0.1.3"}
quic-geyser-server = {path = "server", version="0.1.3"}
quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"}
quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"}
quic-geyser-block-builder = {path = "block-builder", version = "0.1.3"}
[profile.release]
debug = true

22
block-builder/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "quic-geyser-block-builder"
version = "0.1.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-sdk = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
quic-geyser-common = { workspace = true }
bincode = { workspace = true }
itertools = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -0,0 +1,206 @@
use std::{
collections::{BTreeMap, HashMap},
sync::mpsc::{Receiver, Sender},
};
use itertools::Itertools;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
compression::CompressionType,
types::{
account::Account, block::Block, block_meta::BlockMeta, slot_identifier::SlotIdentifier,
transaction::Transaction,
},
};
use solana_sdk::pubkey::Pubkey;
pub fn start_block_building_thread(
channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>,
compression_type: CompressionType,
) {
std::thread::spawn(move || {
build_blocks(channel_messages, output, compression_type);
});
}
#[derive(Default)]
struct PartialBlock {
meta: Option<BlockMeta>,
transactions: Vec<Transaction>,
account_updates: HashMap<Pubkey, AccountData>,
}
pub fn build_blocks(
channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>,
compression_type: CompressionType,
) {
let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new();
while let Ok(channel_message) = channel_messages.recv() {
match channel_message {
ChannelMessage::Account(account_data, slot) => {
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > slot {
log::error!("Account update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot);
}
}
// save account updates
let partial_block = match partially_build_blocks.get_mut(&slot) {
Some(pb) => pb,
None => {
partially_build_blocks.insert(slot, PartialBlock::default());
partially_build_blocks.get_mut(&slot).unwrap()
}
};
let update = match partial_block.account_updates.get(&account_data.pubkey) {
Some(prev_update) => prev_update.write_version < account_data.write_version,
None => true,
};
if update {
partial_block
.account_updates
.insert(account_data.pubkey, account_data);
}
}
ChannelMessage::Slot(slot, _parent_slot, commitment) => {
if commitment.is_confirmed() || commitment.is_finalized() {
// dispactch partially build blocks if not already dispatched
dispatch_partial_block(
&mut partially_build_blocks,
slot,
&output,
compression_type,
);
}
}
ChannelMessage::BlockMeta(meta) => {
let slot = meta.slot;
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > meta.slot {
log::error!("Blockmeta update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), meta.slot);
}
}
// save account updates
let dispatch = {
let partial_block = match partially_build_blocks.get_mut(&meta.slot) {
Some(pb) => pb,
None => {
partially_build_blocks.insert(meta.slot, PartialBlock::default());
partially_build_blocks.get_mut(&meta.slot).unwrap()
}
};
let meta_tx_count = meta.executed_transaction_count;
if partial_block.meta.is_some() {
log::error!("Block meta has already been set");
} else {
partial_block.meta = Some(meta);
}
meta_tx_count == partial_block.transactions.len() as u64
};
if dispatch {
dispatch_partial_block(
&mut partially_build_blocks,
slot,
&output,
compression_type,
);
}
}
ChannelMessage::Transaction(transaction) => {
let slot = transaction.slot_identifier.slot;
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > transaction.slot_identifier.slot {
log::error!("Transactions update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot);
}
}
// save account updates
let (transactions_length_in_pb, meta_transactions_count) = {
let partial_block = match partially_build_blocks.get_mut(&slot) {
Some(pb) => pb,
None => {
partially_build_blocks.insert(slot, PartialBlock::default());
partially_build_blocks.get_mut(&slot).unwrap()
}
};
partial_block.transactions.push(*transaction);
let meta_transactions_count = partial_block
.meta
.as_ref()
.map(|x| x.executed_transaction_count);
(
partial_block.transactions.len() as u64,
meta_transactions_count,
)
};
if Some(transactions_length_in_pb) == meta_transactions_count {
// check if all transactions are taken into account
dispatch_partial_block(
&mut partially_build_blocks,
slot,
&output,
compression_type,
);
}
}
ChannelMessage::Block(_) => {
unreachable!()
}
}
}
}
fn dispatch_partial_block(
partial_blocks: &mut BTreeMap<u64, PartialBlock>,
slot: u64,
output: &Sender<ChannelMessage>,
compression_type: CompressionType,
) {
if let Some(dispatched_partial_block) = partial_blocks.remove(&slot) {
let Some(meta) = dispatched_partial_block.meta else {
log::error!(
"Block was dispactched without any meta data/ cannot dispatch the block {slot}"
);
return;
};
let transactions = dispatched_partial_block.transactions;
if transactions.len() != meta.executed_transaction_count as usize {
log::error!(
"for block at slot {slot} transaction size mismatch {}!={}",
transactions.len(),
meta.executed_transaction_count
);
}
let accounts = dispatched_partial_block
.account_updates
.iter()
.map(|(pubkey, account_data)| {
let data_length = account_data.account.data.len() as u64;
Account {
slot_identifier: SlotIdentifier { slot },
pubkey: *pubkey,
owner: account_data.account.owner,
lamports: account_data.account.lamports,
executable: account_data.account.executable,
rent_epoch: account_data.account.rent_epoch,
write_version: account_data.write_version,
data: account_data.account.data.clone(),
compression_type: CompressionType::None,
data_length,
}
})
.collect_vec();
match Block::build(meta, transactions, accounts, compression_type) {
Ok(block) => {
log::info!("Dispatching block for slot {}", slot);
output.send(ChannelMessage::Block(block)).unwrap();
}
Err(e) => {
log::error!("block building failed because of error: {e}")
}
}
}
}

2
block-builder/src/lib.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod block_builder;
mod tests;

933
block-builder/src/tests.rs Normal file
View File

@ -0,0 +1,933 @@
#[cfg(test)]
mod tests {
use std::{
collections::HashMap,
sync::mpsc::{channel, TryRecvError},
thread::sleep,
time::Duration,
vec,
};
use itertools::Itertools;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
types::{
block_meta::BlockMeta,
slot_identifier::SlotIdentifier,
transaction::{Transaction, TransactionMeta},
},
};
use solana_sdk::{
account::Account,
commitment_config::CommitmentConfig,
hash::Hash,
message::{
v0::{LoadedAddresses, Message as SolanaMessage},
MessageHeader,
},
pubkey::Pubkey,
signature::Signature,
};
use crate::block_builder::start_block_building_thread;
#[test]
fn test_block_creation_transactions_after_blockmeta() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
start_block_building_thread(
cm_rx,
ms_sx,
quic_geyser_common::compression::CompressionType::None,
);
let acc1_pk = Pubkey::new_unique();
let acc1 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc2 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc3 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 2,
},
5,
);
let acc4 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 0,
},
5,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
channelmsg_sx.send(acc3.clone()).unwrap();
channelmsg_sx.send(acc4.clone()).unwrap();
let block_meta = BlockMeta {
parent_slot: 4,
slot: 5,
parent_blockhash: Hash::new_unique().to_string(),
blockhash: Hash::new_unique().to_string(),
rewards: vec![],
block_height: Some(4),
executed_transaction_count: 2,
entries_count: 2,
};
channelmsg_sx
.send(ChannelMessage::BlockMeta(block_meta.clone()))
.unwrap();
let tx1 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx1.clone())))
.unwrap();
let tx2 = Transaction {
slot_identifier: SlotIdentifier { slot: 6 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx2.clone())))
.unwrap();
let tx3 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap();
let block_message = msg_rx.recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
let transactions = block.get_transactions().unwrap();
let accounts = block.get_accounts().unwrap();
let hash_map_accounts: HashMap<_, _> = accounts
.iter()
.map(|acc| (acc.pubkey, acc.data.clone()))
.collect();
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
})
.collect();
assert_eq!(block.meta, block_meta);
assert_eq!(transactions.len(), 2);
assert_eq!(transactions, vec![tx1, tx3]);
assert_eq!(hash_map_accounts, accounts_sent);
}
#[test]
fn test_block_creation_blockmeta_after_transactions() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
start_block_building_thread(
cm_rx,
ms_sx,
quic_geyser_common::compression::CompressionType::None,
);
let acc1_pk = Pubkey::new_unique();
let acc1 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc2 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc3 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 2,
},
5,
);
let acc4 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 0,
},
5,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
channelmsg_sx.send(acc3.clone()).unwrap();
channelmsg_sx.send(acc4.clone()).unwrap();
let block_meta = BlockMeta {
parent_slot: 4,
slot: 5,
parent_blockhash: Hash::new_unique().to_string(),
blockhash: Hash::new_unique().to_string(),
rewards: vec![],
block_height: Some(4),
executed_transaction_count: 2,
entries_count: 2,
};
let tx1 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx1.clone())))
.unwrap();
let tx2 = Transaction {
slot_identifier: SlotIdentifier { slot: 6 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx2.clone())))
.unwrap();
let tx3 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap();
channelmsg_sx
.send(ChannelMessage::BlockMeta(block_meta.clone()))
.unwrap();
let block_message = msg_rx.recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
let transactions = block.get_transactions().unwrap();
let accounts = block.get_accounts().unwrap();
let hash_map_accounts: HashMap<_, _> = accounts
.iter()
.map(|acc| (acc.pubkey, acc.data.clone()))
.collect();
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
})
.collect();
assert_eq!(block.meta, block_meta);
assert_eq!(transactions.len(), 2);
assert_eq!(transactions, vec![tx1, tx3]);
assert_eq!(hash_map_accounts, accounts_sent);
}
#[test]
fn test_block_creation_incomplete_block_after_slot_notification() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
start_block_building_thread(
cm_rx,
ms_sx,
quic_geyser_common::compression::CompressionType::None,
);
let acc1_pk = Pubkey::new_unique();
let acc1 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc2 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc3 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 2,
},
5,
);
let acc4 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 0,
},
5,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
channelmsg_sx.send(acc3.clone()).unwrap();
channelmsg_sx.send(acc4.clone()).unwrap();
let block_meta = BlockMeta {
parent_slot: 4,
slot: 5,
parent_blockhash: Hash::new_unique().to_string(),
blockhash: Hash::new_unique().to_string(),
rewards: vec![],
block_height: Some(4),
executed_transaction_count: 2,
entries_count: 2,
};
channelmsg_sx
.send(ChannelMessage::BlockMeta(block_meta.clone()))
.unwrap();
let tx1 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx1.clone())))
.unwrap();
let tx2 = Transaction {
slot_identifier: SlotIdentifier { slot: 6 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx2.clone())))
.unwrap();
let tx3 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap();
let block_message = msg_rx.recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
let transactions = block.get_transactions().unwrap();
let accounts = block.get_accounts().unwrap();
let hash_map_accounts: HashMap<_, _> = accounts
.iter()
.map(|acc| (acc.pubkey, acc.data.clone()))
.collect();
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
})
.collect();
assert_eq!(block.meta, block_meta);
assert_eq!(transactions.len(), 2);
assert_eq!(transactions, vec![tx1, tx3]);
assert_eq!(hash_map_accounts, accounts_sent);
}
#[test]
fn test_block_creation_incomplete_slot() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
start_block_building_thread(
cm_rx,
ms_sx,
quic_geyser_common::compression::CompressionType::None,
);
let acc1_pk = Pubkey::new_unique();
let acc1 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc2 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
);
let acc3 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 2,
},
5,
);
let acc4 = ChannelMessage::Account(
AccountData {
pubkey: acc1_pk,
account: Account {
lamports: 12345,
data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 0,
},
5,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
channelmsg_sx.send(acc3.clone()).unwrap();
channelmsg_sx.send(acc4.clone()).unwrap();
let block_meta = BlockMeta {
parent_slot: 4,
slot: 5,
parent_blockhash: Hash::new_unique().to_string(),
blockhash: Hash::new_unique().to_string(),
rewards: vec![],
block_height: Some(4),
executed_transaction_count: 5,
entries_count: 2,
};
channelmsg_sx
.send(ChannelMessage::BlockMeta(block_meta.clone()))
.unwrap();
let tx1 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx1.clone())))
.unwrap();
let tx2 = Transaction {
slot_identifier: SlotIdentifier { slot: 6 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx2.clone())))
.unwrap();
channelmsg_sx
.send(ChannelMessage::Slot(5, 4, CommitmentConfig::processed()))
.unwrap();
let tx3 = Transaction {
slot_identifier: SlotIdentifier { slot: 5 },
signatures: vec![Signature::new_unique()],
message: SolanaMessage {
header: MessageHeader {
num_required_signatures: 1,
num_readonly_signed_accounts: 0,
num_readonly_unsigned_accounts: 0,
},
account_keys: vec![acc1_pk],
recent_blockhash: Hash::new_unique(),
instructions: vec![],
address_table_lookups: vec![],
},
is_vote: false,
transasction_meta: TransactionMeta {
error: None,
fee: 0,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: None,
log_messages: Some(vec!["toto".to_string()]),
rewards: None,
loaded_addresses: LoadedAddresses {
writable: vec![],
readonly: vec![],
},
return_data: None,
compute_units_consumed: Some(1234),
},
index: 0,
};
channelmsg_sx
.send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap();
sleep(Duration::from_millis(1));
assert_eq!(msg_rx.try_recv(), Err(TryRecvError::Empty));
channelmsg_sx
.send(ChannelMessage::Slot(5, 4, CommitmentConfig::confirmed()))
.unwrap();
sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
let transactions = block.get_transactions().unwrap();
let accounts = block.get_accounts().unwrap();
let hash_map_accounts: HashMap<_, _> = accounts
.iter()
.map(|acc| (acc.pubkey, acc.data.clone()))
.collect();
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
})
.collect();
assert_eq!(block.meta, block_meta);
assert_eq!(transactions.len(), 2);
assert_eq!(transactions, vec![tx1, tx3]);
assert_eq!(hash_map_accounts, accounts_sent);
}
}

View File

@ -127,6 +127,7 @@ mod tests {
log_level: "debug".to_string(),
allow_accounts: true,
allow_accounts_at_startup: false,
enable_block_builder: false,
};
let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe

View File

@ -25,5 +25,6 @@ pub fn configure_client(
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
config.set_max_ack_delay(maximum_ack_delay);
config.set_ack_delay_exponent(ack_exponent);
config.enable_pacing(false);
Ok(config)
}

View File

@ -147,7 +147,7 @@ pub fn create_quiche_client_thread(
let mut instance = Instant::now();
'client: loop {
poll.poll(&mut events, Some(Duration::from_micros(100)))
poll.poll(&mut events, Some(Duration::from_millis(10)))
.unwrap();
if events.is_empty() {
connection.on_timeout();
@ -233,7 +233,10 @@ pub fn create_quiche_client_thread(
}
for stream_id in connection.writable() {
handle_writable(&mut connection, &mut partial_responses, stream_id);
if let Err(e) = handle_writable(&mut connection, &mut partial_responses, stream_id)
{
log::error!("Error writing message on writable stream : {e:?}");
}
}
if connection.is_closed() {
@ -303,7 +306,7 @@ mod tests {
let message_1 = ChannelMessage::Slot(
3,
2,
solana_sdk::commitment_config::CommitmentLevel::Confirmed,
solana_sdk::commitment_config::CommitmentConfig::confirmed(),
);
let message_2 = ChannelMessage::Account(
AccountData {
@ -375,9 +378,8 @@ mod tests {
rx_sent_queue,
CompressionType::Lz4Fast(8),
true,
100,
) {
println!("Server loop closed by error : {e}");
log::error!("Server loop closed by error : {e}");
}
});
@ -399,7 +401,7 @@ mod tests {
sx_recv_queue,
is_connected,
) {
println!("client stopped with error {e}");
log::error!("client stopped with error {e}");
}
});
client_sx_queue
@ -424,7 +426,7 @@ mod tests {
Message::SlotMsg(SlotMeta {
slot: 3,
parent: 2,
commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed
commitment_config: solana_sdk::commitment_config::CommitmentConfig::confirmed()
})
);

View File

@ -241,6 +241,7 @@ mod tests {
log_level: "debug".to_string(),
allow_accounts: true,
allow_accounts_at_startup: false,
enable_block_builder: false,
};
let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe
@ -266,7 +267,7 @@ mod tests {
})
};
// wait for server to start
sleep(Duration::from_secs(1));
sleep(Duration::from_millis(100));
// server started
let (client, mut reciever) = Client::new(
@ -282,6 +283,7 @@ mod tests {
.await
.unwrap();
client.subscribe(vec![Filter::AccountsAll]).await.unwrap();
sleep(Duration::from_millis(100));
let mut cnt = 0;
for message_sent in msgs {

View File

@ -13,6 +13,7 @@ log = { workspace = true }
thiserror = {workspace = true}
itertools = { workspace = true }
lz4 = { workspace = true }
bincode = { workspace = true }
[dev-dependencies]
rand = { workspace = true }

View File

@ -1,8 +1,8 @@
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey,
account::Account, clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey,
};
use crate::types::{block_meta::BlockMeta, transaction::Transaction};
use crate::types::{block::Block, block_meta::BlockMeta, transaction::Transaction};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AccountData {
@ -14,7 +14,8 @@ pub struct AccountData {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelMessage {
Account(AccountData, Slot),
Slot(u64, u64, CommitmentLevel),
Slot(u64, u64, CommitmentConfig),
BlockMeta(BlockMeta),
Transaction(Box<Transaction>),
Block(Block),
}

View File

@ -28,6 +28,8 @@ pub struct ConfigQuicPlugin {
pub allow_accounts: bool,
#[serde(default)]
pub allow_accounts_at_startup: bool,
#[serde(default = "ConfigQuicPlugin::default_enable_block_builder")]
pub enable_block_builder: bool,
}
impl ConfigQuicPlugin {
@ -46,6 +48,10 @@ impl ConfigQuicPlugin {
fn default_allow_accounts() -> bool {
true
}
fn default_enable_block_builder() -> bool {
true
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Copy)]

View File

@ -1,4 +1,4 @@
pub const DEFAULT_MAX_STREAMS: u64 = 96 * 1024;
pub const DEFAULT_MAX_STREAMS: u64 = 64;
pub const MAX_ALLOWED_PARTIAL_RESPONSES: u64 = DEFAULT_MAX_STREAMS - 1;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 256 * 1024 * 1024; // 256 MBs
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;

View File

@ -14,6 +14,7 @@ pub enum Filter {
BlockMeta,
Transaction(Signature),
TransactionsAll,
BlockAll,
}
impl Filter {
@ -33,6 +34,7 @@ impl Filter {
}
}
Filter::TransactionsAll => matches!(message, ChannelMessage::Transaction(_)),
Filter::BlockAll => matches!(message, ChannelMessage::Block(_)),
}
}
}

View File

@ -4,6 +4,7 @@ use crate::{
filters::Filter,
types::{
account::Account,
block::Block,
block_meta::{BlockMeta, SlotMeta},
transaction::Transaction,
},
@ -19,6 +20,6 @@ pub enum Message {
SlotMsg(SlotMeta),
BlockMetaMsg(BlockMeta),
TransactionMsg(Box<Transaction>),
BlockMsg(Block),
Filters(Vec<Filter>), // sent from client to server
Ping,
}

74
common/src/types/block.rs Normal file
View File

@ -0,0 +1,74 @@
use serde::{Deserialize, Serialize};
use crate::compression::CompressionType;
use super::{account::Account, block_meta::BlockMeta, transaction::Transaction};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Block {
pub meta: BlockMeta,
pub transactions: Vec<u8>, // compressed transaction::Transaction
pub accounts_updated_in_block: Vec<u8>, // compressed account::Account
pub accounts_updated_count: u64,
pub compression_type: CompressionType,
}
impl Block {
pub fn build(
meta: BlockMeta,
transactions: Vec<Transaction>,
accounts: Vec<Account>,
compression_type: CompressionType,
) -> anyhow::Result<Self> {
let accounts_count = accounts.len() as u64;
let transactions_binary = bincode::serialize(&transactions)?;
let transactions_compressed = compression_type.compress(&transactions_binary);
let accounts_binary = bincode::serialize(&accounts)?;
let accounts_compressed = compression_type.compress(&accounts_binary);
Ok(Self {
meta,
transactions: transactions_compressed,
accounts_updated_in_block: accounts_compressed,
accounts_updated_count: accounts_count,
compression_type,
})
}
pub fn get_transactions(&self) -> anyhow::Result<Vec<Transaction>> {
let transactions = match self.compression_type {
CompressionType::None => bincode::deserialize::<Vec<Transaction>>(&self.transactions)?,
CompressionType::Lz4Fast(_) | CompressionType::Lz4(_) => {
let data = lz4::block::decompress(&self.transactions, None)?;
bincode::deserialize::<Vec<Transaction>>(&data)?
}
};
if transactions.len() != self.meta.executed_transaction_count as usize {
log::error!(
"transactions vector size is not equal to expected size in meta {} != {}",
transactions.len(),
self.meta.executed_transaction_count
);
}
Ok(transactions)
}
pub fn get_accounts(&self) -> anyhow::Result<Vec<Account>> {
let accounts = match self.compression_type {
CompressionType::None => {
bincode::deserialize::<Vec<Account>>(&self.accounts_updated_in_block)?
}
CompressionType::Lz4Fast(_) | CompressionType::Lz4(_) => {
let data = lz4::block::decompress(&self.accounts_updated_in_block, None)?;
bincode::deserialize::<Vec<Account>>(&data)?
}
};
if accounts.len() != self.accounts_updated_count as usize {
log::error!(
"accounts vector size is not equal to expected {} != {}",
accounts.len(),
self.accounts_updated_count
);
}
Ok(accounts)
}
}

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize};
use solana_sdk::commitment_config::CommitmentLevel;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_transaction_status::Reward;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
@ -7,7 +7,7 @@ use solana_transaction_status::Reward;
pub struct SlotMeta {
pub slot: u64,
pub parent: u64,
pub commitment_level: CommitmentLevel,
pub commitment_config: CommitmentConfig,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]

View File

@ -1,4 +1,5 @@
pub mod account;
pub mod block;
pub mod block_meta;
pub mod connections_parameters;
pub mod slot_identifier;

View File

@ -9,7 +9,7 @@ use cli::Args;
use quic_geyser_client::non_blocking::client::Client;
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
use solana_rpc_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::commitment_config::CommitmentConfig;
pub mod cli;
@ -51,11 +51,13 @@ pub async fn main() {
let account_notification = Arc::new(AtomicU64::new(0));
let blockmeta_notifications = Arc::new(AtomicU64::new(0));
let transaction_notifications = Arc::new(AtomicU64::new(0));
let block_notifications = Arc::new(AtomicU64::new(0));
let cluster_slot = Arc::new(AtomicU64::new(0));
let account_slot = Arc::new(AtomicU64::new(0));
let slot_slot = Arc::new(AtomicU64::new(0));
let blockmeta_slot = Arc::new(AtomicU64::new(0));
let block_slot = Arc::new(AtomicU64::new(0));
if let Some(rpc_url) = args.rpc_url {
let cluster_slot = cluster_slot.clone();
@ -76,11 +78,13 @@ pub async fn main() {
let blockmeta_notifications = blockmeta_notifications.clone();
let transaction_notifications = transaction_notifications.clone();
let total_accounts_size = total_accounts_size.clone();
let block_notifications = block_notifications.clone();
let cluster_slot = cluster_slot.clone();
let account_slot = account_slot.clone();
let slot_slot = slot_slot.clone();
let blockmeta_slot = blockmeta_slot.clone();
let block_slot = block_slot.clone();
std::thread::spawn(move || {
let mut max_byte_transfer_rate = 0;
loop {
@ -116,8 +120,12 @@ pub async fn main() {
" Transactions notified : {}",
transaction_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Blocks notified : {}",
block_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {}, Block slot: {}", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed), block_slot.load(std::sync::atomic::Ordering::Relaxed));
}
});
}
@ -126,10 +134,10 @@ pub async fn main() {
println!("Subscribing");
client
.subscribe(vec![
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::BlockAll,
Filter::Slot,
Filter::BlockMeta,
Filter::AccountsAll,
])
.await
.unwrap();
@ -168,7 +176,7 @@ pub async fn main() {
quic_geyser_common::message::Message::SlotMsg(slot) => {
log::trace!("got slot notification : {} ", slot.slot);
slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if slot.commitment_level == CommitmentLevel::Processed {
if slot.commitment_config == CommitmentConfig::processed() {
slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
}
}
@ -184,12 +192,14 @@ pub async fn main() {
);
transaction_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::BlockMsg(block) => {
log::info!("got block notification of slot {}, number_of_transactions : {}, number_of_accounts: {}", block.meta.slot, block.get_transactions().unwrap().len(), block.get_accounts().unwrap().len());
block_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
block_slot.store(block.meta.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::Filters(_) => {
// Not supported
}
quic_geyser_common::message::Message::Ping => {
// not supported ping
}
}
}
}

View File

@ -31,6 +31,7 @@ pub fn main() {
number_of_retries: 100,
allow_accounts: true,
allow_accounts_at_startup: false,
enable_block_builder: true,
};
let quic_server = QuicServer::new(config).unwrap();

View File

@ -25,6 +25,7 @@ thiserror = {workspace = true}
quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true }
quic-geyser-block-builder = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }

View File

@ -2,6 +2,7 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
};
use quic_geyser_block_builder::block_builder::start_block_building_thread;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
plugin_error::QuicGeyserError,
@ -13,7 +14,7 @@ use quic_geyser_common::{
};
use quic_geyser_server::quic_server::QuicServer;
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message,
account::Account, clock::Slot, commitment_config::CommitmentConfig, message::v0::Message,
pubkey::Pubkey,
};
@ -22,6 +23,7 @@ use crate::config::Config;
#[derive(Debug, Default)]
pub struct QuicGeyserPlugin {
quic_server: Option<QuicServer>,
block_builder_channel: Option<std::sync::mpsc::Sender<ChannelMessage>>,
}
impl GeyserPlugin for QuicGeyserPlugin {
@ -32,11 +34,22 @@ impl GeyserPlugin for QuicGeyserPlugin {
fn on_load(&mut self, config_file: &str) -> PluginResult<()> {
log::info!("loading quic_geyser plugin");
let config = Config::load_from_file(config_file)?;
let compression_type = config.quic_plugin.compression_parameters.compression_type;
let enable_block_builder = config.quic_plugin.enable_block_builder;
log::info!("Quic plugin config correctly loaded");
solana_logger::setup_with_default(&config.quic_plugin.log_level);
let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
if enable_block_builder {
let (sx, rx) = std::sync::mpsc::channel();
start_block_building_thread(
rx,
quic_server.data_channel_sender.clone(),
compression_type,
);
self.block_builder_channel = Some(sx);
}
self.quic_server = Some(quic_server);
Ok(())
@ -75,15 +88,21 @@ impl GeyserPlugin for QuicGeyserPlugin {
};
let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey");
let channel_message = ChannelMessage::Account(
AccountData {
pubkey,
account,
write_version: account_info.write_version,
},
slot,
);
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(channel_message.clone());
}
quic_server
.send_message(ChannelMessage::Account(
AccountData {
pubkey,
account,
write_version: account_info.write_version,
},
slot,
))
.send_message(channel_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())
}
@ -103,11 +122,16 @@ impl GeyserPlugin for QuicGeyserPlugin {
return Ok(());
};
let commitment_level = match status {
SlotStatus::Processed => CommitmentLevel::Processed,
SlotStatus::Rooted => CommitmentLevel::Finalized,
SlotStatus::Confirmed => CommitmentLevel::Confirmed,
SlotStatus::Processed => CommitmentConfig::processed(),
SlotStatus::Rooted => CommitmentConfig::finalized(),
SlotStatus::Confirmed => CommitmentConfig::confirmed(),
};
let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level);
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(slot_message.clone());
}
quic_server
.send_message(slot_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
@ -173,6 +197,11 @@ impl GeyserPlugin for QuicGeyserPlugin {
};
let transaction_message = ChannelMessage::Transaction(Box::new(transaction));
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(transaction_message.clone());
}
quic_server
.send_message(transaction_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
@ -206,8 +235,14 @@ impl GeyserPlugin for QuicGeyserPlugin {
entries_count: blockinfo.entry_count,
};
let block_meta_message = ChannelMessage::BlockMeta(block_meta);
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(block_meta_message.clone());
}
quic_server
.send_message(ChannelMessage::BlockMeta(block_meta))
.send_message(block_meta_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())
}

View File

@ -58,6 +58,7 @@ pub fn main() -> anyhow::Result<()> {
number_of_retries: 100,
allow_accounts: true,
allow_accounts_at_startup: false,
enable_block_builder: false,
};
let (server_sender, server_reciever) = std::sync::mpsc::channel::<ChannelMessage>();
@ -94,7 +95,7 @@ pub fn main() -> anyhow::Result<()> {
quic_geyser_common::message::Message::SlotMsg(slot_message) => ChannelMessage::Slot(
slot_message.slot,
slot_message.parent,
slot_message.commitment_level,
slot_message.commitment_config,
),
quic_geyser_common::message::Message::BlockMetaMsg(block_meta_message) => {
ChannelMessage::BlockMeta(BlockMeta {

View File

@ -20,7 +20,7 @@ pub fn recv_message(
None => vec![],
};
loop {
let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size
let mut buf = [0; MAX_DATAGRAM_SIZE];
match connection.stream_recv(stream_id, &mut buf) {
Ok((read, fin)) => {
log::trace!("read {} on stream {}", read, stream_id);

View File

@ -6,6 +6,7 @@ pub fn convert_to_binary(message: &Message) -> anyhow::Result<Vec<u8>> {
Ok(bincode::serialize(&message)?)
}
// return if connection has finished writing
pub fn send_message(
connection: &mut Connection,
partial_responses: &mut PartialResponses,
@ -36,31 +37,38 @@ pub fn handle_writable(
conn: &mut quiche::Connection,
partial_responses: &mut PartialResponses,
stream_id: u64,
) -> bool {
) -> std::result::Result<(), quiche::Error> {
log::trace!("{} stream {} is writable", conn.trace_id(), stream_id);
if !partial_responses.contains_key(&stream_id) {
return false;
}
let resp = partial_responses
.get_mut(&stream_id)
.expect("should have a stream id");
let resp = match partial_responses.get_mut(&stream_id) {
Some(s) => s,
None => {
// stream has finished
let _ = conn.stream_send(stream_id, b"", true);
return Ok(());
}
};
let body = &resp.binary;
let written = match conn.stream_send(stream_id, body, true) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,
Err(quiche::Error::Done) => {
// done writing
return Ok(());
}
Err(e) => {
partial_responses.remove(&stream_id);
log::error!("{} stream send failed {:?}", conn.trace_id(), e);
return false;
log::error!(
"{} stream id :{stream_id} send failed {e:?}",
conn.trace_id()
);
return Err(e);
}
};
if written == 0 {
return false;
return Ok(());
}
if written == resp.binary.len() {
@ -77,5 +85,5 @@ pub fn handle_writable(
resp.binary = resp.binary[written..].to_vec();
resp.written += written;
}
true
Ok(())
}

View File

@ -7,7 +7,7 @@ use quic_geyser_common::{
use super::quiche_server_loop::server_loop;
pub struct QuicServer {
data_channel_sender: mpsc::Sender<ChannelMessage>,
pub data_channel_sender: mpsc::Sender<ChannelMessage>,
pub quic_plugin_config: ConfigQuicPlugin,
}
@ -19,7 +19,6 @@ impl Debug for QuicServer {
impl QuicServer {
pub fn new(config: ConfigQuicPlugin) -> anyhow::Result<Self> {
let max_number_of_streams = config.quic_parameters.max_number_of_streams_per_client;
let server_config = configure_server(config.quic_parameters)?;
let socket = config.address;
let compression_type = config.compression_parameters.compression_type;
@ -33,7 +32,6 @@ impl QuicServer {
data_channel_tx,
compression_type,
true,
max_number_of_streams,
) {
panic!("Server loop closed by error : {e}");
}

View File

@ -32,7 +32,6 @@ use quic_geyser_quiche_utils::{
struct DispatchingData {
pub sender: Sender<(Vec<u8>, u8)>,
pub filters: Arc<RwLock<Vec<Filter>>>,
pub message_counter: Arc<AtomicU64>,
}
type DispachingConnections = Arc<Mutex<HashMap<ConnectionId<'static>, DispatchingData>>>;
@ -50,7 +49,6 @@ pub fn server_loop(
message_send_queue: mpsc::Receiver<ChannelMessage>,
compression_type: CompressionType,
stop_laggy_client: bool,
max_number_of_streams: u64,
) -> anyhow::Result<()> {
let maximum_concurrent_streams_id = u64::MAX;
@ -94,11 +92,10 @@ pub fn server_loop(
message_send_queue,
dispatching_connections.clone(),
compression_type,
max_number_of_streams,
);
loop {
poll.poll(&mut events, Some(Duration::from_micros(100)))?;
poll.poll(&mut events, Some(Duration::from_millis(10)))?;
'read: loop {
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
@ -210,9 +207,7 @@ pub fn server_loop(
};
let (client_sender, client_reciver) = mio_channel::channel();
let (client_message_sx, client_message_rx) = mpsc::channel();
let message_counter = Arc::new(AtomicU64::new(0));
let filters = Arc::new(RwLock::new(Vec::new()));
create_client_task(
@ -223,7 +218,6 @@ pub fn server_loop(
filters.clone(),
maximum_concurrent_streams_id,
stop_laggy_client,
message_counter.clone(),
);
let mut lk = dispatching_connections.lock().unwrap();
lk.insert(
@ -231,7 +225,6 @@ pub fn server_loop(
DispatchingData {
sender: client_message_sx,
filters,
message_counter,
},
);
clients.insert(scid, client_sender);
@ -302,7 +295,6 @@ fn create_client_task(
filters: Arc<RwLock<Vec<Filter>>>,
maximum_concurrent_streams_id: u64,
stop_laggy_client: bool,
message_count: Arc<AtomicU64>,
) {
std::thread::spawn(move || {
let mut partial_responses = PartialResponses::new();
@ -340,29 +332,29 @@ fn create_client_task(
std::thread::spawn(move || {
while !quit.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(Duration::from_secs(1));
println!("---------------------------------");
println!(
log::info!("---------------------------------");
log::info!(
"number of loop : {}",
number_of_loops.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
log::info!(
"number of packets read : {}",
number_of_meesages_from_network
.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
log::info!(
"number of packets write : {}",
number_of_meesages_to_network.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
log::info!(
"number_of_readable_streams : {}",
number_of_readable_streams.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
log::info!(
"number_of_writable_streams : {}",
number_of_writable_streams.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
log::info!(
"messages_added : {}",
messages_added.swap(0, std::sync::atomic::Ordering::Relaxed)
);
@ -403,9 +395,6 @@ fn create_client_task(
let mut filter_lk = filters.write().unwrap();
filter_lk.append(&mut f);
}
Message::Ping => {
// got ping
}
_ => {
log::error!("unknown message from the client");
}
@ -418,18 +407,22 @@ fn create_client_task(
}
}
for stream_id in connection.writable() {
number_of_writable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
handle_writable(&mut connection, &mut partial_responses, stream_id);
}
if !connection.is_closed()
&& (connection.is_established() || connection.is_in_early_data())
{
for stream_id in connection.writable() {
number_of_writable_streams.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Err(e) =
handle_writable(&mut connection, &mut partial_responses, stream_id)
{
log::error!("Error writing {e:?}");
}
}
if !connection.is_closed() && connection.is_established() {
while partial_responses.len() < max_allowed_partial_responses {
let close = match message_channel.try_recv() {
Ok((message, priority)) => {
message_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let stream_id = next_stream;
next_stream =
get_next_unidi(stream_id, true, maximum_concurrent_streams_id);
@ -444,13 +437,22 @@ fn create_client_task(
true
} else {
messages_added.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
send_message(
match send_message(
&mut connection,
&mut partial_responses,
stream_id,
&message,
)
.is_err()
) {
Ok(_) => false,
Err(quiche::Error::Done) => {
// done writing / queue is full
break;
}
Err(e) => {
log::error!("error sending message : {e:?}");
true
}
}
}
}
Err(e) => {
@ -460,6 +462,7 @@ fn create_client_task(
}
mpsc::TryRecvError::Disconnected => {
// too many message the connection is lagging
log::error!("channel disconnected by dispatcher");
true
}
}
@ -524,7 +527,6 @@ fn create_dispatching_thread(
message_send_queue: mpsc::Receiver<ChannelMessage>,
dispatching_connections: DispachingConnections,
compression_type: CompressionType,
max_number_of_streams: u64,
) {
std::thread::spawn(move || {
while let Ok(message) = message_send_queue.recv() {
@ -554,13 +556,13 @@ fn create_dispatching_thread(
account.write_version,
);
(Message::AccountMsg(geyser_account), 4)
(Message::AccountMsg(geyser_account), 3)
}
ChannelMessage::Slot(slot, parent, commitment_level) => (
ChannelMessage::Slot(slot, parent, commitment_config) => (
Message::SlotMsg(SlotMeta {
slot,
parent,
commitment_level,
commitment_config,
}),
1,
),
@ -568,25 +570,18 @@ fn create_dispatching_thread(
ChannelMessage::Transaction(transaction) => {
(Message::TransactionMsg(transaction), 3)
}
ChannelMessage::Block(block) => (Message::BlockMsg(block), 2),
};
let binary =
bincode::serialize(&message).expect("Message should be serializable in binary");
for id in dispatching_connections.iter() {
let data = dispatching_connections_lk.get(id).unwrap();
data.message_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if data.sender.send((binary.clone(), priority)).is_err() {
// client is closed
dispatching_connections_lk.remove(id);
}
}
}
dispatching_connections_lk.retain(|_id, connection| {
connection
.message_counter
.load(std::sync::atomic::Ordering::Relaxed)
< max_number_of_streams
});
}
});
}