many minor changes

This commit is contained in:
godmodegalactus 2024-07-10 18:02:46 +02:00
parent fc33f8f65e
commit 471c20686b
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
17 changed files with 420 additions and 75 deletions

351
Cargo.lock generated
View File

@ -1060,6 +1060,19 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "der"
version = "0.5.1"
@ -2045,6 +2058,67 @@ dependencies = [
"thiserror",
]
[[package]]
name = "lite-account-manager-common"
version = "0.1.0"
source = "git+https://github.com/blockworks-foundation/solana-lite-account-manager.git?branch=main#6c4db602b14c34f57a2ab933936f29f30dfde017"
dependencies = [
"anyhow",
"async-trait",
"base64 0.21.7",
"bs58",
"itertools",
"lz4",
"serde",
"serde_json",
"solana-rpc-client-api",
"solana-sdk",
"tokio",
"zstd",
]
[[package]]
name = "lite-account-storage"
version = "0.1.0"
source = "git+https://github.com/blockworks-foundation/solana-lite-account-manager.git?branch=main#6c4db602b14c34f57a2ab933936f29f30dfde017"
dependencies = [
"async-trait",
"bincode",
"dashmap",
"futures",
"itertools",
"lazy_static",
"lite-account-manager-common",
"log",
"prometheus",
"solana-sdk",
"tokio",
]
[[package]]
name = "lite-token-account-storage"
version = "0.1.0"
source = "git+https://github.com/blockworks-foundation/solana-lite-account-manager.git?branch=main#6c4db602b14c34f57a2ab933936f29f30dfde017"
dependencies = [
"anyhow",
"async-trait",
"bincode",
"dashmap",
"futures",
"itertools",
"lazy_static",
"lite-account-manager-common",
"log",
"lz4",
"prometheus",
"serde",
"solana-program",
"solana-sdk",
"spl-token",
"spl-token-2022 3.0.2",
"tokio",
]
[[package]]
name = "litemap"
version = "0.7.3"
@ -2618,6 +2692,27 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror",
]
[[package]]
name = "protobuf"
version = "2.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
[[package]]
name = "qstring"
version = "0.7.2"
@ -2794,6 +2889,27 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "quic-geyser-snapshot"
version = "0.1.5"
dependencies = [
"anyhow",
"bincode",
"itertools",
"lite-account-manager-common",
"lite-account-storage",
"lite-token-account-storage",
"log",
"lz4",
"quic-geyser-common",
"serde",
"serde_json",
"solana-program",
"solana-sdk",
"solana-transaction-status",
"thiserror",
]
[[package]]
name = "quic-plugin-tester-client"
version = "0.1.5"
@ -3507,9 +3623,9 @@ dependencies = [
"solana-config-program",
"solana-sdk",
"spl-token",
"spl-token-2022",
"spl-token-group-interface",
"spl-token-metadata-interface",
"spl-token-2022 1.0.0",
"spl-token-group-interface 0.1.0",
"spl-token-metadata-interface 0.2.0",
"thiserror",
"zstd",
]
@ -3728,7 +3844,7 @@ dependencies = [
"solana-sdk",
"solana-transaction-status",
"solana-version",
"spl-token-2022",
"spl-token-2022 1.0.0",
"thiserror",
]
@ -3827,7 +3943,7 @@ dependencies = [
"spl-associated-token-account",
"spl-memo",
"spl-token",
"spl-token-2022",
"spl-token-2022 1.0.0",
"thiserror",
]
@ -3950,7 +4066,7 @@ dependencies = [
"num-traits",
"solana-program",
"spl-token",
"spl-token-2022",
"spl-token-2022 1.0.0",
"thiserror",
]
@ -3962,7 +4078,18 @@ checksum = "daa600f2fe56f32e923261719bae640d873edadbc5237681a39b8e37bfd4d263"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator-derive",
"spl-discriminator-derive 0.1.2",
]
[[package]]
name = "spl-discriminator"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34d1814406e98b08c5cd02c1126f83fd407ad084adce0b05fda5730677822eac"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator-derive 0.2.0",
]
[[package]]
@ -3972,7 +4099,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07fd7858fc4ff8fb0e34090e41d7eb06a823e1057945c26d480bfc21d2338a93"
dependencies = [
"quote",
"spl-discriminator-syn",
"spl-discriminator-syn 0.1.2",
"syn 2.0.66",
]
[[package]]
name = "spl-discriminator-derive"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9e8418ea6269dcfb01c712f0444d2c75542c04448b480e87de59d2865edc750"
dependencies = [
"quote",
"spl-discriminator-syn 0.2.0",
"syn 2.0.66",
]
@ -3989,6 +4127,19 @@ dependencies = [
"thiserror",
]
[[package]]
name = "spl-discriminator-syn"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c1f05593b7ca9eac7caca309720f2eafb96355e037e6d373b909a80fe7b69b9"
dependencies = [
"proc-macro2",
"quote",
"sha2 0.10.8",
"syn 2.0.66",
"thiserror",
]
[[package]]
name = "spl-memo"
version = "4.0.0"
@ -4008,7 +4159,20 @@ dependencies = [
"bytemuck",
"solana-program",
"solana-zk-token-sdk",
"spl-program-error",
"spl-program-error 0.3.1",
]
[[package]]
name = "spl-pod"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046ce669f48cf2eca1ec518916d8725596bfb655beb1c74374cf71dc6cb773c9"
dependencies = [
"borsh 1.5.1",
"bytemuck",
"solana-program",
"solana-zk-token-sdk",
"spl-program-error 0.4.1",
]
[[package]]
@ -4020,7 +4184,20 @@ dependencies = [
"num-derive 0.4.2",
"num-traits",
"solana-program",
"spl-program-error-derive",
"spl-program-error-derive 0.3.2",
"thiserror",
]
[[package]]
name = "spl-program-error"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49065093ea91f57b9b2bd81493ff705e2ad4e64507a07dbc02b085778e02770e"
dependencies = [
"num-derive 0.4.2",
"num-traits",
"solana-program",
"spl-program-error-derive 0.4.1",
"thiserror",
]
@ -4036,6 +4213,18 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "spl-program-error-derive"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d375dd76c517836353e093c2dbb490938ff72821ab568b545fd30ab3256b3e"
dependencies = [
"proc-macro2",
"quote",
"sha2 0.10.8",
"syn 2.0.66",
]
[[package]]
name = "spl-tlv-account-resolution"
version = "0.5.2"
@ -4044,10 +4233,24 @@ checksum = "56f335787add7fa711819f9e7c573f8145a5358a709446fe2d24bf2a88117c90"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator",
"spl-pod",
"spl-program-error",
"spl-type-length-value",
"spl-discriminator 0.1.1",
"spl-pod 0.1.1",
"spl-program-error 0.3.1",
"spl-type-length-value 0.3.1",
]
[[package]]
name = "spl-tlv-account-resolution"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cace91ba08984a41556efe49cbf2edca4db2f577b649da7827d3621161784bf8"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator 0.2.2",
"spl-pod 0.2.2",
"spl-program-error 0.4.1",
"spl-type-length-value 0.4.3",
]
[[package]]
@ -4080,12 +4283,36 @@ dependencies = [
"solana-security-txt",
"solana-zk-token-sdk",
"spl-memo",
"spl-pod",
"spl-pod 0.1.1",
"spl-token",
"spl-token-group-interface",
"spl-token-metadata-interface",
"spl-transfer-hook-interface",
"spl-type-length-value",
"spl-token-group-interface 0.1.0",
"spl-token-metadata-interface 0.2.0",
"spl-transfer-hook-interface 0.4.1",
"spl-type-length-value 0.3.1",
"thiserror",
]
[[package]]
name = "spl-token-2022"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5412f99ae7ee6e0afde00defaa354e6228e47e30c0e3adf553e2e01e6abb584"
dependencies = [
"arrayref",
"bytemuck",
"num-derive 0.4.2",
"num-traits",
"num_enum 0.7.2",
"solana-program",
"solana-security-txt",
"solana-zk-token-sdk",
"spl-memo",
"spl-pod 0.2.2",
"spl-token",
"spl-token-group-interface 0.2.3",
"spl-token-metadata-interface 0.3.3",
"spl-transfer-hook-interface 0.6.3",
"spl-type-length-value 0.4.3",
"thiserror",
]
@ -4097,9 +4324,22 @@ checksum = "b889509d49fa74a4a033ca5dae6c2307e9e918122d97e58562f5c4ffa795c75d"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator",
"spl-pod",
"spl-program-error",
"spl-discriminator 0.1.1",
"spl-pod 0.1.1",
"spl-program-error 0.3.1",
]
[[package]]
name = "spl-token-group-interface"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d419b5cfa3ee8e0f2386fd7e02a33b3ec8a7db4a9c7064a2ea24849dc4a273b6"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator 0.2.2",
"spl-pod 0.2.2",
"spl-program-error 0.4.1",
]
[[package]]
@ -4110,10 +4350,24 @@ checksum = "4c16ce3ba6979645fb7627aa1e435576172dd63088dc7848cb09aa331fa1fe4f"
dependencies = [
"borsh 0.10.3",
"solana-program",
"spl-discriminator",
"spl-pod",
"spl-program-error",
"spl-type-length-value",
"spl-discriminator 0.1.1",
"spl-pod 0.1.1",
"spl-program-error 0.3.1",
"spl-type-length-value 0.3.1",
]
[[package]]
name = "spl-token-metadata-interface"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30179c47e93625680dabb620c6e7931bd12d62af390f447bc7beb4a3a9b5feee"
dependencies = [
"borsh 1.5.1",
"solana-program",
"spl-discriminator 0.2.2",
"spl-pod 0.2.2",
"spl-program-error 0.4.1",
"spl-type-length-value 0.4.3",
]
[[package]]
@ -4125,11 +4379,27 @@ dependencies = [
"arrayref",
"bytemuck",
"solana-program",
"spl-discriminator",
"spl-pod",
"spl-program-error",
"spl-tlv-account-resolution",
"spl-type-length-value",
"spl-discriminator 0.1.1",
"spl-pod 0.1.1",
"spl-program-error 0.3.1",
"spl-tlv-account-resolution 0.5.2",
"spl-type-length-value 0.3.1",
]
[[package]]
name = "spl-transfer-hook-interface"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a98359769cd988f7b35c02558daa56d496a7e3bd8626e61f90a7c757eedb9b"
dependencies = [
"arrayref",
"bytemuck",
"solana-program",
"spl-discriminator 0.2.2",
"spl-pod 0.2.2",
"spl-program-error 0.4.1",
"spl-tlv-account-resolution 0.6.3",
"spl-type-length-value 0.4.3",
]
[[package]]
@ -4140,9 +4410,22 @@ checksum = "8f9ebd75d29c5f48de5f6a9c114e08531030b75b8ac2c557600ac7da0b73b1e8"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator",
"spl-pod",
"spl-program-error",
"spl-discriminator 0.1.1",
"spl-pod 0.1.1",
"spl-program-error 0.3.1",
]
[[package]]
name = "spl-type-length-value"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "422ce13429dbd41d2cee8a73931c05fda0b0c8ca156a8b0c19445642550bb61a"
dependencies = [
"bytemuck",
"solana-program",
"spl-discriminator 0.2.2",
"spl-pod 0.2.2",
"spl-program-error 0.4.1",
]
[[package]]

View File

@ -11,6 +11,7 @@ members = [
"examples/tester-server",
"proxy",
"block-builder",
"snapshot",
]
[workspace.package]
@ -65,6 +66,12 @@ quic-geyser-server = {path = "server", version="0.1.5"}
quic-geyser-quiche-utils = {path = "quiche", version = "0.1.5"}
quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.5"}
quic-geyser-block-builder = {path = "block-builder", version = "0.1.5"}
quic-geyser-snapshot = {path = "snapshot", version = "0.1.5"}
# solana lite account manager for snapshot creation
lite-account-manager-common = { git = "https://github.com/blockworks-foundation/solana-lite-account-manager.git", branch = "main" }
lite-account-storage = { git = "https://github.com/blockworks-foundation/solana-lite-account-manager.git", branch = "main" }
lite-token-account-storage = { git = "https://github.com/blockworks-foundation/solana-lite-account-manager.git", branch = "main" }
[profile.release]
debug = true

View File

@ -46,7 +46,11 @@ pub fn build_blocks(
let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new();
while let Ok(channel_message) = channel_messages.recv() {
match channel_message {
ChannelMessage::Account(account_data, slot) => {
ChannelMessage::Account(account_data, slot, init) => {
if init {
continue;
}
if build_blocks_with_accounts {
if let Some(lowest) = partially_build_blocks.first_entry() {
if *lowest.key() > slot {

View File

@ -54,6 +54,7 @@ fn test_block_creation_transactions_after_blockmeta() {
write_version: 1,
},
5,
false,
);
let acc2 = ChannelMessage::Account(
AccountData {
@ -68,6 +69,7 @@ fn test_block_creation_transactions_after_blockmeta() {
write_version: 1,
},
5,
false,
);
let acc3 = ChannelMessage::Account(
@ -83,6 +85,7 @@ fn test_block_creation_transactions_after_blockmeta() {
write_version: 2,
},
5,
false,
);
let acc4 = ChannelMessage::Account(
@ -98,6 +101,7 @@ fn test_block_creation_transactions_after_blockmeta() {
write_version: 0,
},
5,
false,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
@ -241,7 +245,7 @@ fn test_block_creation_transactions_after_blockmeta() {
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
let ChannelMessage::Account(acc, ..) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
@ -278,6 +282,7 @@ fn test_block_creation_blockmeta_after_transactions() {
write_version: 1,
},
5,
false,
);
let acc2 = ChannelMessage::Account(
AccountData {
@ -292,6 +297,7 @@ fn test_block_creation_blockmeta_after_transactions() {
write_version: 1,
},
5,
false,
);
let acc3 = ChannelMessage::Account(
@ -307,6 +313,7 @@ fn test_block_creation_blockmeta_after_transactions() {
write_version: 2,
},
5,
false,
);
let acc4 = ChannelMessage::Account(
@ -322,6 +329,7 @@ fn test_block_creation_blockmeta_after_transactions() {
write_version: 0,
},
5,
false,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
@ -466,7 +474,7 @@ fn test_block_creation_blockmeta_after_transactions() {
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
let ChannelMessage::Account(acc, ..) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
@ -503,6 +511,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
write_version: 1,
},
5,
false,
);
let acc2 = ChannelMessage::Account(
AccountData {
@ -517,6 +526,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
write_version: 1,
},
5,
false,
);
let acc3 = ChannelMessage::Account(
@ -532,6 +542,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
write_version: 2,
},
5,
false,
);
let acc4 = ChannelMessage::Account(
@ -547,6 +558,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
write_version: 0,
},
5,
false,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
@ -690,7 +702,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
let ChannelMessage::Account(acc, ..) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())
@ -727,6 +739,7 @@ fn test_block_creation_incomplete_slot() {
write_version: 1,
},
5,
false,
);
let acc2 = ChannelMessage::Account(
AccountData {
@ -741,6 +754,7 @@ fn test_block_creation_incomplete_slot() {
write_version: 1,
},
5,
false,
);
let acc3 = ChannelMessage::Account(
@ -756,6 +770,7 @@ fn test_block_creation_incomplete_slot() {
write_version: 2,
},
5,
false,
);
let acc4 = ChannelMessage::Account(
@ -771,6 +786,7 @@ fn test_block_creation_incomplete_slot() {
write_version: 0,
},
5,
false,
);
channelmsg_sx.send(acc1.clone()).unwrap();
channelmsg_sx.send(acc2.clone()).unwrap();
@ -925,7 +941,7 @@ fn test_block_creation_incomplete_slot() {
let accounts_sent: HashMap<_, _> = [acc2, acc3]
.iter()
.map(|acc| {
let ChannelMessage::Account(acc, _) = acc else {
let ChannelMessage::Account(acc, ..) = acc else {
unreachable!();
};
(acc.pubkey, acc.account.data.clone())

View File

@ -145,6 +145,7 @@ mod tests {
write_version: account.write_version,
},
account.slot_identifier.slot,
false,
),
)
.unwrap();

View File

@ -333,6 +333,7 @@ mod tests {
write_version: 1,
},
5,
false,
);
let message_3 = ChannelMessage::Account(
@ -348,6 +349,7 @@ mod tests {
write_version: 1,
},
5,
false,
);
let message_4 = ChannelMessage::Account(
@ -363,6 +365,7 @@ mod tests {
write_version: 1,
},
5,
false,
);
let message_5 = ChannelMessage::Account(
@ -378,6 +381,7 @@ mod tests {
write_version: 1,
},
5,
false,
);
// server loop
@ -443,7 +447,7 @@ mod tests {
let message_rx_2 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot) = &message_2 else {
let ChannelMessage::Account(account, slot, _) = &message_2 else {
panic!("message should be account");
};
let Message::AccountMsg(message_rx_2) = message_rx_2 else {
@ -456,7 +460,7 @@ mod tests {
let message_rx_3 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot) = &message_3 else {
let ChannelMessage::Account(account, slot, _) = &message_3 else {
panic!("message should be account");
};
let Message::AccountMsg(message_rx_3) = message_rx_3 else {
@ -468,7 +472,7 @@ mod tests {
assert_eq!(message_rx_3.slot_identifier.slot, *slot);
let message_rx_4 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot) = &message_4 else {
let ChannelMessage::Account(account, slot, _) = &message_4 else {
panic!("message should be account");
};
let Message::AccountMsg(message_rx_4) = message_rx_4 else {
@ -480,7 +484,7 @@ mod tests {
assert_eq!(message_rx_4.slot_identifier.slot, *slot);
let message_rx_5 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot) = &message_5 else {
let ChannelMessage::Account(account, slot, _) = &message_5 else {
panic!("message should be account");
};
let Message::AccountMsg(message_rx_5) = message_rx_5 else {

View File

@ -1,3 +1,4 @@
use anyhow::bail;
use quic_geyser_common::defaults::ALPN_GEYSER_PROTOCOL_ID;
use quic_geyser_common::defaults::DEFAULT_MAX_RECIEVE_WINDOW_SIZE;
use quic_geyser_common::defaults::MAX_DATAGRAM_SIZE;
@ -97,7 +98,11 @@ impl Client {
pub async fn new(
server_address: String,
connection_parameters: ConnectionParameters,
) -> anyhow::Result<(Client, tokio::sync::mpsc::UnboundedReceiver<Message>)> {
) -> anyhow::Result<(
Client,
tokio::sync::mpsc::UnboundedReceiver<Message>,
Vec<tokio::task::JoinHandle<anyhow::Result<()>>>,
)> {
let timeout: u64 = connection_parameters.timeout_in_seconds;
let endpoint = create_client_endpoint(connection_parameters);
let socket_addr = SocketAddr::from_str(&server_address)?;
@ -107,7 +112,7 @@ impl Client {
tokio::sync::mpsc::unbounded_channel::<Message>();
let connection = connecting.await?;
{
let jh1 = {
let connection = connection.clone();
tokio::spawn(async move {
// limit client to respond to 128k streams in parallel
@ -145,11 +150,12 @@ impl Client {
},
}
}
});
}
bail!("quic client stopped, connection lost")
})
};
// create a ping thread
{
let jh2 = {
let connection = connection.clone();
tokio::spawn(async move {
let ping_message = bincode::serialize(&Message::Ping)
@ -165,10 +171,11 @@ impl Client {
break;
}
}
});
}
bail!("quic client stopped, ping message thread dropped")
})
};
Ok((Client { connection }, message_rx_queue))
Ok((Client { connection }, message_rx_queue, vec![jh1, jh2]))
}
pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
@ -279,6 +286,7 @@ mod tests {
write_version: account.write_version,
},
account.slot_identifier.slot,
false,
),
)
.unwrap();
@ -290,7 +298,7 @@ mod tests {
sleep(Duration::from_millis(100));
// server started
let (client, mut reciever) = Client::new(
let (client, mut reciever, _tasks) = Client::new(
url,
ConnectionParameters {
max_number_of_streams: 10,

View File

@ -13,7 +13,7 @@ pub struct AccountData {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelMessage {
Account(AccountData, Slot),
Account(AccountData, Slot, bool),
Slot(u64, u64, CommitmentConfig),
BlockMeta(BlockMeta),
Transaction(Box<Transaction>),

View File

@ -4,8 +4,8 @@ use serde::{Deserialize, Serialize};
#[repr(C)]
pub enum CompressionType {
None,
Lz4Fast(u32),
Lz4(u32),
Lz4Fast(i32),
Lz4(i32),
}
impl Default for CompressionType {
@ -22,17 +22,13 @@ impl CompressionType {
match self {
CompressionType::None => data.to_vec(),
CompressionType::Lz4Fast(speed) => lz4::block::compress(
data,
Some(lz4::block::CompressionMode::FAST(*speed as i32)),
true,
)
.expect("Compression should work"),
CompressionType::Lz4Fast(speed) => {
lz4::block::compress(data, Some(lz4::block::CompressionMode::FAST(*speed)), true)
.expect("Compression should work")
}
CompressionType::Lz4(compression) => lz4::block::compress(
data,
Some(lz4::block::CompressionMode::HIGHCOMPRESSION(
*compression as i32,
)),
Some(lz4::block::CompressionMode::HIGHCOMPRESSION(*compression)),
true,
)
.expect("compression should work"),

View File

@ -25,7 +25,8 @@ impl Filter {
match &self {
Filter::Account(account) => account.allows(message),
Filter::AccountsAll => match message {
ChannelMessage::Account(account, _) => {
ChannelMessage::Account(account, _, init) => {
!init &&
account.account.owner != solana_program::vote::program::ID // does not belong to vote program
&& account.account.owner != solana_program::stake::program::ID
// does not belong to stake program
@ -46,7 +47,7 @@ impl Filter {
Filter::TransactionsAll => matches!(message, ChannelMessage::Transaction(_)),
Filter::BlockAll => matches!(message, ChannelMessage::Block(_)),
Filter::DeletedAccounts => match message {
ChannelMessage::Account(account, _) => account.account.lamports == 0,
ChannelMessage::Account(account, _, _) => account.account.lamports == 0,
_ => false,
},
Filter::AccountsExcluding(account) => !account.allows(message),
@ -85,7 +86,10 @@ pub struct AccountFilter {
impl AccountFilter {
pub fn allows(&self, message: &ChannelMessage) -> bool {
if let ChannelMessage::Account(account, _) = message {
if let ChannelMessage::Account(account, _, init) = message {
if *init {
return false;
}
if let Some(owner) = self.owner {
if owner == account.account.owner {
// to do move the filtering somewhere else because here we need to decode the account data
@ -159,6 +163,16 @@ mod tests {
rent_epoch: 100,
};
let msg_0 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: solana_account_1.clone(),
write_version: 0,
},
0,
true,
);
let msg_1 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
@ -166,6 +180,7 @@ mod tests {
write_version: 0,
},
0,
false,
);
let msg_2 = ChannelMessage::Account(
@ -175,6 +190,7 @@ mod tests {
write_version: 0,
},
0,
false,
);
let msg_3 = ChannelMessage::Account(
@ -184,6 +200,7 @@ mod tests {
write_version: 0,
},
0,
false,
);
let f1 = AccountFilter {
@ -192,6 +209,7 @@ mod tests {
filter: None,
};
assert_eq!(f1.allows(&msg_0), false);
assert_eq!(f1.allows(&msg_1), true);
assert_eq!(f1.allows(&msg_2), true);
assert_eq!(f1.allows(&msg_3), false);
@ -201,6 +219,7 @@ mod tests {
accounts: None,
filter: Some(AccountFilterType::Datasize(9)),
};
assert_eq!(f2.allows(&msg_0), false);
assert_eq!(f2.allows(&msg_1), false);
assert_eq!(f2.allows(&msg_2), false);
assert_eq!(f2.allows(&msg_3), false);
@ -210,6 +229,7 @@ mod tests {
accounts: None,
filter: Some(AccountFilterType::Datasize(10)),
};
assert_eq!(f3.allows(&msg_0), false);
assert_eq!(f3.allows(&msg_1), true);
assert_eq!(f3.allows(&msg_2), true);
assert_eq!(f3.allows(&msg_3), false);
@ -222,6 +242,7 @@ mod tests {
data: crate::filters::MemcmpFilterData::Bytes(vec![3, 4, 5]),
})),
};
assert_eq!(f4.allows(&msg_0), false);
assert_eq!(f4.allows(&msg_1), true);
assert_eq!(f4.allows(&msg_2), false);
assert_eq!(f4.allows(&msg_3), false);
@ -234,6 +255,7 @@ mod tests {
data: crate::filters::MemcmpFilterData::Bytes(vec![13, 14, 15]),
})),
};
assert_eq!(f5.allows(&msg_0), false);
assert_eq!(f5.allows(&msg_1), false);
assert_eq!(f5.allows(&msg_2), true);
assert_eq!(f5.allows(&msg_3), false);

View File

@ -35,15 +35,13 @@ impl Account {
CompressionType::None => solana_account.data,
CompressionType::Lz4Fast(speed) => lz4::block::compress(
&solana_account.data,
Some(lz4::block::CompressionMode::FAST(speed as i32)),
Some(lz4::block::CompressionMode::FAST(speed)),
true,
)
.expect("Compression should work"),
CompressionType::Lz4(compression) => lz4::block::compress(
&solana_account.data,
Some(lz4::block::CompressionMode::HIGHCOMPRESSION(
compression as i32,
)),
Some(lz4::block::CompressionMode::HIGHCOMPRESSION(compression)),
true,
)
.expect("compression should work"),

View File

@ -225,7 +225,7 @@ fn blocking(args: Args, client_stats: ClientStats, break_thread: Arc<AtomicBool>
async fn non_blocking(args: Args, client_stats: ClientStats, break_thread: Arc<AtomicBool>) {
println!("Connecting");
let (client, mut reciever) = Client::new(
let (client, mut reciever, _tasks) = Client::new(
args.url,
ConnectionParameters {
max_number_of_streams: args.number_of_streams,

View File

@ -69,7 +69,7 @@ pub fn main() {
},
write_version,
};
let channel_message = ChannelMessage::Account(account, slot);
let channel_message = ChannelMessage::Account(account, slot, false);
quic_server.send_message(channel_message).unwrap();
}
}

View File

@ -97,6 +97,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
write_version: account_info.write_version,
},
slot,
is_startup,
);
if let Some(block_channel) = &self.block_builder_channel {

View File

@ -14,7 +14,7 @@ pub struct Args {
pub port: u64,
#[clap(short, long, default_value_t = 8)]
pub compression_speed: u32,
pub compression_speed: i32,
#[clap(short, long, default_value_t = 50)]
pub max_number_of_connections: u64,

View File

@ -90,6 +90,7 @@ pub fn main() -> anyhow::Result<()> {
write_version: account_message.write_version,
},
account_message.slot_identifier.slot,
false,
)
}
quic_geyser_common::message::Message::SlotMsg(slot_message) => ChannelMessage::Slot(

View File

@ -609,7 +609,11 @@ fn create_dispatching_thread(
if !dispatching_connections.is_empty() {
let (message, priority) = match message {
ChannelMessage::Account(account, slot) => {
ChannelMessage::Account(account, slot, init) => {
if init {
// do not sent init messages
continue;
}
let slot_identifier = SlotIdentifier { slot };
let geyser_account = Account::new(
account.pubkey,