try to do deserialization of transaction in a rayon thread (#4801)

* try to do deserialization of transaction in a rayon thread

* Try tokio::task::block_in_place instead

* fix tests

* add deserialize block into rayon pool

* fill some docs

Co-authored-by: teor <teor@riseup.net>
This commit is contained in:
Alfredo Garcia 2022-07-21 20:14:58 -03:00 committed by GitHub
parent 21d3af4b8d
commit 71fe4c4c73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 62 additions and 11 deletions

1
Cargo.lock generated
View File

@ -6408,6 +6408,7 @@ dependencies = [
"proptest",
"proptest-derive",
"rand 0.8.5",
"rayon",
"regex",
"serde",
"static_assertions",

View File

@ -24,6 +24,7 @@ lazy_static = "1.4.0"
ordered-map = "0.4.2"
pin-project = "1.0.10"
rand = { version = "0.8.5", package = "rand" }
rayon = "1.5.3"
regex = "1.5.6"
serde = { version = "1.0.137", features = ["serde_derive"] }
thiserror = "1.0.31"

View File

@ -570,8 +570,9 @@ impl Codec {
Ok(Message::GetAddr)
}
fn read_block<R: Read>(&self, reader: R) -> Result<Message, Error> {
Ok(Message::Block(Block::zcash_deserialize(reader)?.into()))
fn read_block<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
let result = Self::deserialize_block_spawning(reader);
Ok(Message::Block(result?.into()))
}
fn read_getblocks<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
@ -625,8 +626,9 @@ impl Codec {
Ok(Message::NotFound(Vec::zcash_deserialize(reader)?))
}
fn read_tx<R: Read>(&self, reader: R) -> Result<Message, Error> {
Ok(Message::Tx(Transaction::zcash_deserialize(reader)?.into()))
fn read_tx<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
let result = Self::deserialize_transaction_spawning(reader);
Ok(Message::Tx(result?.into()))
}
fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
@ -674,6 +676,52 @@ impl Codec {
fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
Ok(Message::FilterClear)
}
/// Given the reader, deserialize the transaction in the rayon thread pool.
#[allow(clippy::unwrap_in_result)]
fn deserialize_transaction_spawning<R: Read + std::marker::Send>(
reader: R,
) -> Result<Transaction, Error> {
let mut result = None;
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Since we use `block_in_place()`, other futures running on the connection task will be blocked:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
//
// We can't use `spawn_blocking()` because:
// - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
// - There is no way to check the blocking task's future for panics
tokio::task::block_in_place(|| {
rayon::in_place_scope_fifo(|s| {
s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader)))
})
});
result.expect("scope has already finished")
}
/// Given the reader, deserialize the block in the rayon thread pool.
#[allow(clippy::unwrap_in_result)]
fn deserialize_block_spawning<R: Read + std::marker::Send>(reader: R) -> Result<Block, Error> {
let mut result = None;
// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Since we use `block_in_place()`, other futures running on the connection task will be blocked:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
//
// We can't use `spawn_blocking()` because:
// - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
// - There is no way to check the blocking task's future for panics
tokio::task::block_in_place(|| {
rayon::in_place_scope_fifo(|s| {
s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader)))
})
});
result.expect("scope has already finished")
}
}
// XXX replace these interior unit tests with exterior integration tests + proptest
@ -943,7 +991,8 @@ mod tests {
fn max_msg_size_round_trip() {
use zebra_chain::serialization::ZcashDeserializeInto;
let rt = zebra_test::init_async();
//let rt = zebra_test::init_async();
zebra_test::init();
// make tests with a Tx message
let tx: Transaction = zebra_test::vectors::DUMMY_TX1
@ -957,7 +1006,7 @@ mod tests {
let size = 85;
// reducing the max size to body size - 1
rt.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut bytes = Vec::new();
{
let mut fw = FramedWrite::new(
@ -971,7 +1020,7 @@ mod tests {
});
// send again with the msg body size as max size
let msg_bytes = rt.block_on(async {
let msg_bytes = zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut bytes = Vec::new();
{
let mut fw = FramedWrite::new(
@ -986,7 +1035,7 @@ mod tests {
});
// receive with a reduced max size
rt.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut fr = FramedRead::new(
Cursor::new(&msg_bytes),
Codec::builder().with_max_body_len(size - 1).finish(),
@ -998,7 +1047,7 @@ mod tests {
});
// receive again with the tx size as max size
rt.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut fr = FramedRead::new(
Cursor::new(&msg_bytes),
Codec::builder().with_max_body_len(size).finish(),

View File

@ -336,7 +336,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
///
/// Uses a Zebra network stack's peer set to query an isolated Zebra TCP connection,
/// with an unrelated transaction test responder.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError> {
// We respond with an unrelated transaction, so the peer gives up on the request.
let unrelated_response: Transaction =
@ -486,7 +486,7 @@ async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError
/// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer.
///
/// The requests are coming from the full stack to the isolated peer.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError> {
// We repeatedly respond with the same transaction, so the peer gives up on the second response.
let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;