Moving to async csv to save the transaction logs, using broadcast channels between stats and logs

This commit is contained in:
Godmode Galactus 2023-03-16 16:25:26 +01:00
parent 3655da8cde
commit a22aebd193
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
6 changed files with 239 additions and 28 deletions

215
Cargo.lock generated
View File

@ -464,6 +464,64 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-executor"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17adb73da160dfb475c183343c8cccd80721ea5a605d3eb57125f0a7b7a92d0b"
dependencies = [
"async-lock",
"async-task",
"concurrent-queue",
"fastrand",
"futures-lite",
"slab",
]
[[package]]
name = "async-global-executor"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776"
dependencies = [
"async-channel",
"async-executor",
"async-io",
"async-lock",
"blocking",
"futures-lite",
"once_cell",
]
[[package]]
name = "async-io"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c374dda1ed3e7d8f0d9ba58715f924862c63eae6849c92d3a18e7fbde9e2794"
dependencies = [
"async-lock",
"autocfg 1.1.0",
"concurrent-queue",
"futures-lite",
"libc",
"log 0.4.17",
"parking",
"polling",
"slab",
"socket2",
"waker-fn",
"windows-sys 0.42.0",
]
[[package]]
name = "async-lock"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7"
dependencies = [
"event-listener",
]
[[package]]
name = "async-mutex"
version = "1.4.0"
@ -473,6 +531,32 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-std"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d"
dependencies = [
"async-channel",
"async-global-executor",
"async-io",
"async-lock",
"crossbeam-utils 0.8.15",
"futures-channel",
"futures-core",
"futures-io",
"futures-lite",
"gloo-timers",
"kv-log-macro",
"log 0.4.17",
"memchr",
"once_cell",
"pin-project-lite",
"pin-utils",
"slab",
"wasm-bindgen-futures",
]
[[package]]
name = "async-stream"
version = "0.3.4"
@ -495,6 +579,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-task"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524"
[[package]]
name = "async-trait"
version = "0.1.67"
@ -506,6 +596,12 @@ dependencies = [
"syn 2.0.2",
]
[[package]]
name = "atomic-waker"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "debc29dde2e69f9e47506b525f639ed42300fc014a3e007832592448fa8e4599"
[[package]]
name = "atty"
version = "0.2.14"
@ -753,6 +849,20 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae"
[[package]]
name = "blocking"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8"
dependencies = [
"async-channel",
"async-lock",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
]
[[package]]
name = "borsh"
version = "0.9.3"
@ -883,6 +993,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d4260bcc2e8fc9df1eac4919a720effeb63a3f0952f5bf4944adfa18897f09"
dependencies = [
"memchr",
"once_cell",
"regex-automata",
"serde",
]
@ -1375,12 +1487,15 @@ dependencies = [
]
[[package]]
name = "csv"
version = "1.2.1"
name = "csv-async"
version = "1.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad"
checksum = "71933d3f2d0481d5111cb2817b15b6961961458ec58adf8008194e6c850046f4"
dependencies = [
"bstr",
"cfg-if 1.0.0",
"csv-core",
"futures 0.3.27",
"itoa",
"ryu",
"serde",
@ -1395,6 +1510,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote 1.0.26",
"syn 1.0.109",
]
[[package]]
name = "ctr"
version = "0.8.0"
@ -2113,6 +2238,21 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.27"
@ -2277,6 +2417,18 @@ dependencies = [
"regex",
]
[[package]]
name = "gloo-timers"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c"
dependencies = [
"futures-channel",
"futures-core",
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "goauth"
version = "0.13.1"
@ -2965,6 +3117,15 @@ dependencies = [
"winapi-build",
]
[[package]]
name = "kv-log-macro"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f"
dependencies = [
"log 0.4.17",
]
[[package]]
name = "language-tags"
version = "0.2.2"
@ -3147,6 +3308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [
"cfg-if 1.0.0",
"value-bag",
]
[[package]]
@ -3245,13 +3407,14 @@ dependencies = [
"anyhow",
"arrayref",
"async-channel",
"async-std",
"async-trait",
"borsh 0.9.3",
"bytemuck",
"cargo-lock",
"chrono",
"clap 2.34.0",
"csv",
"csv-async",
"dashmap 5.4.0",
"fixed",
"fixed-macro",
@ -3841,6 +4004,12 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d52571ddcb42e9c900c901a18d8d67e393df723fcd51dd59c5b1a85d0acb6cc"
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.3.8"
@ -4107,6 +4276,22 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
[[package]]
name = "polling"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e1f879b2998099c2d69ab9605d145d5b661195627eccc680002c4918a7fb6fa"
dependencies = [
"autocfg 1.1.0",
"bitflags",
"cfg-if 1.0.0",
"concurrent-queue",
"libc",
"log 0.4.17",
"pin-project-lite",
"windows-sys 0.45.0",
]
[[package]]
name = "polyval"
version = "0.5.3"
@ -4746,6 +4931,12 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
version = "0.6.28"
@ -8130,6 +8321,16 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "value-bag"
version = "1.0.0-alpha.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55"
dependencies = [
"ctor",
"version_check 0.9.4",
]
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -8178,6 +8379,12 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.3"

View File

@ -51,12 +51,13 @@ solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", br
solana-program = "1.9.17"
thiserror = "1.0"
csv = "1.0.0"
csv-async = "1.2"
async-std = "1.12.0"
tonic = { version = "0.8.2", features = ["gzip", "tls", "tls-roots"] }
tokio = { version = "1", features = ["full"] }
anyhow = "1.0"
async-channel = "1.6"
async-trait = "0.1"
async-trait = "0.1.66"
prost = "0.11"
warp = "0.3"
futures = "0.3.17"

View File

@ -20,8 +20,7 @@ use solana_transaction_status::{
use crate::states::{BlockData, TransactionConfirmRecord, TransactionSendRecord};
use async_channel::Sender;
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant};
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant, sync::broadcast::Sender};
pub async fn process_blocks(
block: &UiConfirmedBlock,
@ -100,7 +99,6 @@ pub async fn process_blocks(
timed_out: false,
priority_fees: transaction_record.priority_fees,
})
.await
{
Ok(_) => {}
Err(e) => {
@ -134,8 +132,8 @@ pub async fn process_blocks(
pub fn confirmations_by_blocks(
client: Arc<RpcClient>,
mut tx_record_rx: UnboundedReceiver<TransactionSendRecord>,
tx_confirm_records: Sender<TransactionConfirmRecord>,
tx_block_data: Sender<BlockData>,
tx_confirm_records: tokio::sync::broadcast::Sender<TransactionConfirmRecord>,
tx_block_data: tokio::sync::broadcast::Sender<BlockData>,
from_slot: u64,
exit_signal: Arc<AtomicBool>,
) -> Vec<JoinHandle<()>> {
@ -210,8 +208,7 @@ pub fn confirmations_by_blocks(
slot_leader: None,
timed_out: true,
priority_fees: sent_record.priority_fees,
})
.await;
});
to_remove.push(signature.clone());
}
}

View File

@ -203,19 +203,19 @@ pub async fn main() -> anyhow::Result<()> {
let mut tasks = vec![];
tasks.push(blockhash_thread);
let (tx_status_sx, tx_status_rx) = async_channel::unbounded();
let (block_status_sx, block_status_rx) = async_channel::unbounded();
let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1024);
let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1024);
let mut writers_jh = initialize_result_writers(
transaction_save_file,
block_data_save_file,
tx_status_rx.clone(),
block_status_rx.clone(),
tx_status_rx,
block_status_rx,
);
tasks.append(&mut writers_jh);
let stats_handle =
mango_sim_stats.update_from_tx_status_stream(tx_status_rx.clone(), exit_signal.clone());
mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe(), exit_signal.clone());
tasks.push(stats_handle);
let mut confirmation_threads = confirmations_by_blocks(

View File

@ -1,6 +1,5 @@
use async_channel::Receiver;
use tokio::task::JoinHandle;
use tokio::{task::JoinHandle, sync::broadcast::Receiver};
use async_std::fs::File;
use crate::states::{BlockData, TransactionConfirmRecord};
pub fn initialize_result_writers(
@ -13,30 +12,36 @@ pub fn initialize_result_writers(
if !transaction_save_file.is_empty() {
let tx_data_jh = tokio::spawn(async move {
let mut writer = csv::Writer::from_path(transaction_save_file).unwrap();
let mut writer = csv_async::AsyncSerializer::from_writer(
File::create(transaction_save_file).await.unwrap()
);
let mut tx_data = tx_data;
loop {
if let Ok(record) = tx_data.recv().await {
writer.serialize(record).unwrap();
writer.serialize(record).await.unwrap();
} else {
break;
}
}
writer.flush().unwrap();
writer.flush().await.unwrap();
});
tasks.push(tx_data_jh);
}
if !block_data_save_file.is_empty() {
let block_data_jh = tokio::spawn(async move {
let mut writer = csv::Writer::from_path(block_data_save_file).unwrap();
let mut writer = csv_async::AsyncSerializer::from_writer(
File::create(block_data_save_file).await.unwrap()
);
let mut block_data = block_data;
loop {
if let Ok(record) = block_data.recv().await {
writer.serialize(record).unwrap();
writer.serialize(record).await.unwrap();
} else {
break;
}
}
writer.flush().unwrap();
writer.flush().await.unwrap();
});
tasks.push(block_data_jh);
}

View File

@ -62,11 +62,12 @@ impl MangoSimulationStats {
pub fn update_from_tx_status_stream(
&self,
tx_confirm_record_reciever: async_channel::Receiver<TransactionConfirmRecord>,
tx_confirm_record_reciever: tokio::sync::broadcast::Receiver<TransactionConfirmRecord>,
do_exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let counters = self.counters.clone();
tokio::spawn(async move {
let mut tx_confirm_record_reciever = tx_confirm_record_reciever;
loop {
if do_exit.load(Ordering::Relaxed) {
break;