Write stage optimizations (#1534)
- Testnet dashboard shows that channel pressure for write stage is incrementing on every iteration of write. - This change optimizes ledger writing by removing cloning of map and reducing calls to flush
This commit is contained in:
parent
7611730cdb
commit
639c93460a
|
@ -368,7 +368,7 @@ impl LedgerWriter {
|
||||||
Ok(LedgerWriter { index, data })
|
Ok(LedgerWriter { index, data })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
|
pub fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
|
||||||
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
|
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
|
||||||
|
|
||||||
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
|
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
|
||||||
|
@ -395,13 +395,17 @@ impl LedgerWriter {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
|
pub fn flush(&mut self) -> io::Result<()> {
|
||||||
self.write_entry_noflush(&entry)?;
|
|
||||||
self.index.flush()?;
|
self.index.flush()?;
|
||||||
self.data.flush()?;
|
self.data.flush()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
|
||||||
|
self.write_entry_noflush(&entry)?;
|
||||||
|
self.flush()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
|
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = Entry>,
|
I: IntoIterator<Item = Entry>,
|
||||||
|
@ -409,9 +413,7 @@ impl LedgerWriter {
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
self.write_entry_noflush(&entry)?;
|
self.write_entry_noflush(&entry)?;
|
||||||
}
|
}
|
||||||
self.index.flush()?;
|
self.flush()
|
||||||
self.data.flush()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -166,15 +166,15 @@ impl WriteStage {
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
for entries in ventries {
|
for entries in ventries {
|
||||||
for e in &entries {
|
|
||||||
num_txs += e.transactions.len();
|
|
||||||
}
|
|
||||||
let cluster_info_votes_start = Instant::now();
|
let cluster_info_votes_start = Instant::now();
|
||||||
let votes = &entries.votes();
|
let votes = &entries.votes();
|
||||||
cluster_info.write().unwrap().insert_votes(&votes);
|
cluster_info.write().unwrap().insert_votes(&votes);
|
||||||
cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed());
|
cluster_info_votes_total += duration_as_ms(&cluster_info_votes_start.elapsed());
|
||||||
|
|
||||||
ledger_writer.write_entries(entries.clone())?;
|
for e in &entries {
|
||||||
|
num_txs += e.transactions.len();
|
||||||
|
ledger_writer.write_entry_noflush(&e)?;
|
||||||
|
}
|
||||||
// Once the entries have been written to the ledger, then we can
|
// Once the entries have been written to the ledger, then we can
|
||||||
// safely incement entry height
|
// safely incement entry height
|
||||||
*entry_height += entries.len() as u64;
|
*entry_height += entries.len() as u64;
|
||||||
|
@ -196,6 +196,7 @@ impl WriteStage {
|
||||||
|
|
||||||
entries_send_total += duration_as_ms(&entries_send_start.elapsed());
|
entries_send_total += duration_as_ms(&entries_send_start.elapsed());
|
||||||
}
|
}
|
||||||
|
ledger_writer.flush()?;
|
||||||
inc_new_counter_info!(
|
inc_new_counter_info!(
|
||||||
"write_stage-time_ms",
|
"write_stage-time_ms",
|
||||||
duration_as_ms(&now.elapsed()) as usize
|
duration_as_ms(&now.elapsed()) as usize
|
||||||
|
|
Loading…
Reference in New Issue