diff --git a/Cargo.lock b/Cargo.lock index 8cdd38c..8740d11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,6 +464,64 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-executor" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17adb73da160dfb475c183343c8cccd80721ea5a605d3eb57125f0a7b7a92d0b" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c374dda1ed3e7d8f0d9ba58715f924862c63eae6849c92d3a18e7fbde9e2794" +dependencies = [ + "async-lock", + "autocfg 1.1.0", + "concurrent-queue", + "futures-lite", + "libc", + "log 0.4.17", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "windows-sys 0.42.0", +] + +[[package]] +name = "async-lock" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa24f727524730b077666307f2734b4a1a1c57acb79193127dcc8914d5242dd7" +dependencies = [ + "event-listener", +] + [[package]] name = "async-mutex" version = "1.4.0" @@ -473,6 +531,32 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils 0.8.15", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log 0.4.17", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.4" @@ -495,6 +579,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-task" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524" + [[package]] name = "async-trait" version = "0.1.67" @@ -506,6 +596,12 @@ dependencies = [ "syn 2.0.2", ] +[[package]] +name = "atomic-waker" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "debc29dde2e69f9e47506b525f639ed42300fc014a3e007832592448fa8e4599" + [[package]] name = "atty" version = "0.2.14" @@ -753,6 +849,20 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" +[[package]] +name = "blocking" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", +] + [[package]] name = "borsh" version = "0.9.3" @@ -883,6 +993,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3d4260bcc2e8fc9df1eac4919a720effeb63a3f0952f5bf4944adfa18897f09" dependencies = [ "memchr", + "once_cell", + "regex-automata", "serde", ] @@ -1375,12 +1487,15 @@ dependencies = [ ] [[package]] -name = "csv" -version = "1.2.1" +name = "csv-async" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" +checksum = "71933d3f2d0481d5111cb2817b15b6961961458ec58adf8008194e6c850046f4" dependencies = [ + "bstr", + "cfg-if 1.0.0", "csv-core", + "futures 0.3.27", "itoa", "ryu", "serde", @@ -1395,6 +1510,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote 1.0.26", + "syn 1.0.109", +] + [[package]] name = "ctr" version = "0.8.0" @@ -2113,6 +2238,21 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d422fa3cbe3b40dca574ab087abb5bc98258ea57eea3fd6f1fa7162c778b91" +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.27" @@ -2277,6 +2417,18 @@ dependencies = [ "regex", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "goauth" version = "0.13.1" @@ -2965,6 +3117,15 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log 0.4.17", +] + [[package]] name = "language-tags" version = "0.2.2" @@ -3147,6 +3308,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if 1.0.0", + "value-bag", ] [[package]] @@ -3245,13 +3407,14 @@ dependencies = [ "anyhow", "arrayref", "async-channel", + "async-std", "async-trait", "borsh 0.9.3", "bytemuck", "cargo-lock", "chrono", "clap 2.34.0", - "csv", + "csv-async", "dashmap 5.4.0", "fixed", "fixed-macro", @@ -3841,6 +4004,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d52571ddcb42e9c900c901a18d8d67e393df723fcd51dd59c5b1a85d0acb6cc" +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + [[package]] name = "parking_lot" version = "0.3.8" @@ -4107,6 +4276,22 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "polling" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e1f879b2998099c2d69ab9605d145d5b661195627eccc680002c4918a7fb6fa" +dependencies = [ + "autocfg 1.1.0", + "bitflags", + "cfg-if 1.0.0", + "concurrent-queue", + "libc", + "log 0.4.17", + "pin-project-lite", + "windows-sys 0.45.0", +] + [[package]] name = "polyval" version = "0.5.3" @@ -4746,6 +4931,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" + [[package]] name = "regex-syntax" version = "0.6.28" @@ -8130,6 +8321,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "value-bag" +version = "1.0.0-alpha.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" +dependencies = [ + "ctor", + "version_check 0.9.4", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -8178,6 +8379,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + [[package]] name = "walkdir" version = "2.3.3" diff --git a/Cargo.toml b/Cargo.toml index be4db92..93280ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,12 +51,13 @@ solana-account-decoder = { git = "https://github.com/solana-labs/solana.git", br solana-program = "1.9.17" thiserror = "1.0" -csv = "1.0.0" +csv-async = "1.2" +async-std = "1.12.0" tonic = { version = "0.8.2", features = ["gzip", "tls", "tls-roots"] } tokio = { version = "1", features = ["full"] } anyhow = "1.0" async-channel = "1.6" -async-trait = "0.1" +async-trait = "0.1.66" prost = "0.11" warp = "0.3" futures = "0.3.17" diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index 4769917..f6d430c 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -20,8 +20,7 @@ use solana_transaction_status::{ use crate::states::{BlockData, TransactionConfirmRecord, TransactionSendRecord}; -use async_channel::Sender; -use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant}; +use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle, time::Instant, sync::broadcast::Sender}; pub async fn process_blocks( block: &UiConfirmedBlock, @@ -100,7 +99,6 @@ pub async fn process_blocks( timed_out: false, priority_fees: transaction_record.priority_fees, }) - .await { Ok(_) => {} Err(e) => { @@ -134,8 +132,8 @@ pub async fn process_blocks( pub fn confirmations_by_blocks( client: Arc, mut tx_record_rx: UnboundedReceiver, - tx_confirm_records: Sender, - tx_block_data: Sender, + tx_confirm_records: tokio::sync::broadcast::Sender, + tx_block_data: tokio::sync::broadcast::Sender, from_slot: u64, exit_signal: Arc, ) -> Vec> { @@ -210,8 +208,7 @@ pub fn confirmations_by_blocks( slot_leader: None, timed_out: true, priority_fees: sent_record.priority_fees, - }) - .await; + }); to_remove.push(signature.clone()); } } diff --git a/src/main.rs b/src/main.rs index 9a04f15..27e4066 100644 --- a/src/main.rs +++ b/src/main.rs @@ -203,19 +203,19 @@ pub async fn main() -> anyhow::Result<()> { let mut tasks = vec![]; tasks.push(blockhash_thread); - let (tx_status_sx, tx_status_rx) = async_channel::unbounded(); - let (block_status_sx, block_status_rx) = async_channel::unbounded(); + let (tx_status_sx, tx_status_rx) = tokio::sync::broadcast::channel(1024); + let (block_status_sx, block_status_rx) = tokio::sync::broadcast::channel(1024); let mut writers_jh = initialize_result_writers( transaction_save_file, block_data_save_file, - tx_status_rx.clone(), - block_status_rx.clone(), + tx_status_rx, + block_status_rx, ); tasks.append(&mut writers_jh); let stats_handle = - mango_sim_stats.update_from_tx_status_stream(tx_status_rx.clone(), exit_signal.clone()); + mango_sim_stats.update_from_tx_status_stream(tx_status_sx.subscribe(), exit_signal.clone()); tasks.push(stats_handle); let mut confirmation_threads = confirmations_by_blocks( diff --git a/src/result_writer.rs b/src/result_writer.rs index 6bad889..037ee5a 100644 --- a/src/result_writer.rs +++ b/src/result_writer.rs @@ -1,6 +1,5 @@ -use async_channel::Receiver; -use tokio::task::JoinHandle; - +use tokio::{task::JoinHandle, sync::broadcast::Receiver}; +use async_std::fs::File; use crate::states::{BlockData, TransactionConfirmRecord}; pub fn initialize_result_writers( @@ -13,30 +12,36 @@ pub fn initialize_result_writers( if !transaction_save_file.is_empty() { let tx_data_jh = tokio::spawn(async move { - let mut writer = csv::Writer::from_path(transaction_save_file).unwrap(); + let mut writer = csv_async::AsyncSerializer::from_writer( + File::create(transaction_save_file).await.unwrap() + ); + let mut tx_data = tx_data; loop { if let Ok(record) = tx_data.recv().await { - writer.serialize(record).unwrap(); + writer.serialize(record).await.unwrap(); } else { break; } } - writer.flush().unwrap(); + writer.flush().await.unwrap(); }); tasks.push(tx_data_jh); } if !block_data_save_file.is_empty() { let block_data_jh = tokio::spawn(async move { - let mut writer = csv::Writer::from_path(block_data_save_file).unwrap(); + let mut writer = csv_async::AsyncSerializer::from_writer( + File::create(block_data_save_file).await.unwrap() + ); + let mut block_data = block_data; loop { if let Ok(record) = block_data.recv().await { - writer.serialize(record).unwrap(); + writer.serialize(record).await.unwrap(); } else { break; } } - writer.flush().unwrap(); + writer.flush().await.unwrap(); }); tasks.push(block_data_jh); } diff --git a/src/stats.rs b/src/stats.rs index 49e3c5f..4a023bd 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -62,11 +62,12 @@ impl MangoSimulationStats { pub fn update_from_tx_status_stream( &self, - tx_confirm_record_reciever: async_channel::Receiver, + tx_confirm_record_reciever: tokio::sync::broadcast::Receiver, do_exit: Arc, ) -> JoinHandle<()> { let counters = self.counters.clone(); tokio::spawn(async move { + let mut tx_confirm_record_reciever = tx_confirm_record_reciever; loop { if do_exit.load(Ordering::Relaxed) { break;