From 58a4905916a3bf1db1f91347d86a8407b8dd224e Mon Sep 17 00:00:00 2001 From: carllin Date: Sat, 22 Dec 2018 19:30:30 -0800 Subject: [PATCH] Make reconstruct_entries_from_blobs() support Blobs and borrowed SharedBlobs, make distinction between to_blobs and to_shared_blobs (#2270) --- benches/db_ledger.rs | 10 ++-- src/broadcast_service.rs | 2 +- src/cluster_info.rs | 12 ++-- src/db_ledger.rs | 62 +++++++++++--------- src/db_window.rs | 42 ++++++++------ src/entry.rs | 43 ++++++++------ src/erasure.rs | 2 +- src/ledger.rs | 15 +++-- src/packet.rs | 38 ++++++++----- src/window.rs | 119 --------------------------------------- tests/multinode.rs | 7 ++- 11 files changed, 135 insertions(+), 217 deletions(-) diff --git a/benches/db_ledger.rs b/benches/db_ledger.rs index 728d0f05e3..0dc35b99b7 100644 --- a/benches/db_ledger.rs +++ b/benches/db_ledger.rs @@ -44,7 +44,7 @@ fn setup_read_bench( entries.extend(make_tiny_test_entries(num_small_blobs as usize)); // Convert the entries to blobs, write the blobs to the ledger - let shared_blobs = entries.to_blobs(); + let shared_blobs = entries.to_shared_blobs(); for b in shared_blobs.iter() { b.write().unwrap().set_slot(slot).unwrap(); } @@ -60,7 +60,7 @@ fn bench_write_small(bench: &mut Bencher) { let ledger_path = get_tmp_ledger_path("bench_write_small"); let num_entries = 32 * 1024; let entries = make_tiny_test_entries(num_entries); - let shared_blobs = entries.to_blobs(); + let shared_blobs = entries.to_shared_blobs(); let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect(); let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect(); bench_write_blobs(bench, &mut blobs, &ledger_path); @@ -73,7 +73,7 @@ fn bench_write_big(bench: &mut Bencher) { let ledger_path = get_tmp_ledger_path("bench_write_big"); let num_entries = 32 * 1024; let entries = make_tiny_test_entries(num_entries); - let shared_blobs = entries.to_blobs(); + let shared_blobs = entries.to_shared_blobs(); let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect(); let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect(); bench_write_blobs(bench, &mut blobs, &ledger_path); @@ -147,7 +147,7 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) { DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger"); let num_entries = 32 * 1024; let entries = make_tiny_test_entries(num_entries); - let mut shared_blobs = entries.to_blobs(); + let mut shared_blobs = entries.to_shared_blobs(); shared_blobs.shuffle(&mut thread_rng()); bench.iter(move || { @@ -172,7 +172,7 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) { DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger"); let num_entries = 32 * 1024; let entries = make_large_test_entries(num_entries); - let mut shared_blobs = entries.to_blobs(); + let mut shared_blobs = entries.to_shared_blobs(); shared_blobs.shuffle(&mut thread_rng()); bench.iter(move || { diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 4272e18954..f36d99a418 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -72,7 +72,7 @@ fn broadcast( let blobs: Vec<_> = ventries .into_par_iter() - .flat_map(|p| p.to_blobs()) + .flat_map(|p| p.to_shared_blobs()) .collect(); let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect(); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index a93c89909f..fad378deeb 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -20,7 +20,7 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId}; use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT}; -use crate::packet::{to_blob, Blob, SharedBlob, BLOB_SIZE}; +use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE}; use crate::result::Result; use crate::rpc::RPC_PORT; use crate::streamer::{BlobReceiver, BlobSender}; @@ -602,7 +602,7 @@ impl ClusterInfo { let reqs = obj.write().unwrap().gossip_request(); let blobs = reqs .into_iter() - .filter_map(|(remote_gossip_addr, req)| to_blob(req, remote_gossip_addr).ok()) + .filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok()) .collect(); blob_sender.send(blobs)?; Ok(()) @@ -760,7 +760,7 @@ impl ClusterInfo { from.gossip = *from_addr; } inc_new_counter_info!("cluster_info-pull_request-rsp", len); - to_blob(rsp, from.gossip).ok().into_iter().collect() + to_shared_blob(rsp, from.gossip).ok().into_iter().collect() } } fn handle_pull_response(me: &Arc>, from: Pubkey, data: Vec) { @@ -805,13 +805,15 @@ impl ClusterInfo { }; prune_msg.sign(&me.read().unwrap().keypair); let rsp = Protocol::PruneMessage(self_id, prune_msg); - to_blob(rsp, ci.gossip).ok() + to_shared_blob(rsp, ci.gossip).ok() }) .into_iter() .collect(); let mut blobs: Vec<_> = pushes .into_iter() - .filter_map(|(remote_gossip_addr, req)| to_blob(req, remote_gossip_addr).ok()) + .filter_map(|(remote_gossip_addr, req)| { + to_shared_blob(req, remote_gossip_addr).ok() + }) .collect(); rsp.append(&mut blobs); rsp diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 70830789bf..7cfb4bf652 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -354,9 +354,10 @@ impl DbLedger { pub fn write_blobs<'a, I>(&self, blobs: I) -> Result> where - I: IntoIterator, + I: IntoIterator, + I::Item: Borrow<&'a Blob>, { - let blobs = blobs.into_iter().cloned(); + let blobs = blobs.into_iter().map(|b| *b.borrow()); let new_entries = self.insert_data_blobs(blobs)?; Ok(new_entries) } @@ -366,17 +367,18 @@ impl DbLedger { I: IntoIterator, I::Item: Borrow, { - let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| { - let b = entry.borrow().to_blob(); - { - let mut w_b = b.write().unwrap(); - w_b.set_index(idx as u64).unwrap(); - w_b.set_slot(slot).unwrap(); - } - b - }); + let blobs: Vec<_> = entries + .into_iter() + .enumerate() + .map(|(idx, entry)| { + let mut b = entry.borrow().to_blob(); + b.set_index(idx as u64).unwrap(); + b.set_slot(slot).unwrap(); + b + }) + .collect(); - self.write_shared_blobs(shared_blobs) + self.write_blobs(&blobs) } pub fn insert_data_blobs(&self, new_blobs: I) -> Result> @@ -700,15 +702,19 @@ where let db_ledger = DbLedger::open(ledger_path)?; // TODO sign these blobs with keypair - let blobs = entries.into_iter().enumerate().map(|(idx, entry)| { - let b = entry.borrow().to_blob(); - b.write().unwrap().set_index(idx as u64).unwrap(); - b.write().unwrap().set_id(&keypair.pubkey()).unwrap(); - b.write().unwrap().set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); - b - }); + let blobs: Vec<_> = entries + .into_iter() + .enumerate() + .map(|(idx, entry)| { + let mut b = entry.borrow().to_blob(); + b.set_index(idx as u64).unwrap(); + b.set_id(&keypair.pubkey()).unwrap(); + b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap(); + b + }) + .collect(); - db_ledger.write_shared_blobs(blobs)?; + db_ledger.write_blobs(&blobs[..])?; Ok(()) } @@ -768,7 +774,7 @@ mod tests { #[test] fn test_get_blobs_bytes() { - let shared_blobs = make_tiny_test_entries(10).to_blobs(); + let shared_blobs = make_tiny_test_entries(10).to_shared_blobs(); let slot = DEFAULT_SLOT_HEIGHT; index_blobs( shared_blobs.iter().zip(vec![slot; 10].into_iter()), @@ -838,7 +844,7 @@ mod tests { #[test] fn test_insert_data_blobs_basic() { let entries = make_tiny_test_entries(2); - let shared_blobs = entries.to_blobs(); + let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(i as u64).unwrap(); @@ -882,7 +888,7 @@ mod tests { fn test_insert_data_blobs_multiple() { let num_blobs = 10; let entries = make_tiny_test_entries(num_blobs); - let shared_blobs = entries.to_blobs(); + let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(i as u64).unwrap(); } @@ -919,7 +925,7 @@ mod tests { fn test_insert_data_blobs_slots() { let num_blobs = 10; let entries = make_tiny_test_entries(num_blobs); - let shared_blobs = entries.to_blobs(); + let shared_blobs = entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { b.write().unwrap().set_index(i as u64).unwrap(); } @@ -966,7 +972,7 @@ mod tests { // Write entries let num_entries = 8; - let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); + let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); @@ -1009,7 +1015,7 @@ mod tests { // Write entries let num_entries = 20 as u64; let original_entries = make_tiny_test_entries(num_entries as usize); - let shared_blobs = original_entries.clone().to_blobs(); + let shared_blobs = original_entries.clone().to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); w_b.set_index(i as u64).unwrap(); @@ -1055,7 +1061,7 @@ mod tests { .flat_map(|e| vec![e; num_duplicates]) .collect(); - let shared_blobs = original_entries.clone().to_blobs(); + let shared_blobs = original_entries.clone().to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let index = (i / 2) as u64; let mut w_b = b.write().unwrap(); @@ -1106,7 +1112,7 @@ mod tests { // Write entries let num_entries = 20 as u64; let original_entries = make_tiny_test_entries(num_entries as usize); - let shared_blobs = original_entries.to_blobs(); + let shared_blobs = original_entries.to_shared_blobs(); for (i, b) in shared_blobs.iter().enumerate() { let mut w_b = b.write().unwrap(); w_b.set_index(i as u64).unwrap(); diff --git a/src/db_window.rs b/src/db_window.rs index 4c2e7e784d..8039bd3d18 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -12,6 +12,7 @@ use crate::streamer::BlobSender; use log::Level; use solana_metrics::{influxdb, submit}; use solana_sdk::pubkey::Pubkey; +use std::borrow::Borrow; use std::cmp; use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; @@ -299,7 +300,7 @@ pub fn process_blob( .put(&erasure_key, &rblob.data[..BLOB_HEADER_SIZE + size])?; vec![] } else { - db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])? + db_ledger.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])? }; #[cfg(feature = "erasure")] @@ -402,6 +403,7 @@ mod test { use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::streamer::{receiver, responder, PacketReceiver}; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::borrow::Borrow; use std::io; use std::io::Write; use std::net::UdpSocket; @@ -532,7 +534,7 @@ mod test { assert_eq!(find_missing_data_indexes(slot, &db_ledger, 4, 3, 1), empty); assert_eq!(find_missing_data_indexes(slot, &db_ledger, 1, 2, 0), empty); - let shared_blob = &make_tiny_test_entries(1).to_blobs()[0]; + let shared_blob = &make_tiny_test_entries(1).to_shared_blobs()[0]; let first_index = 10; { let mut bl = shared_blob.write().unwrap(); @@ -542,7 +544,7 @@ mod test { // Insert one blob at index = first_index db_ledger - .write_blobs(&vec![&*shared_blob.read().unwrap()]) + .write_blobs(&vec![(*shared_blob.read().unwrap()).borrow()]) .unwrap(); // The first blob has index = first_index. Thus, for i < first_index, @@ -575,14 +577,11 @@ mod test { let gap = 10; assert!(gap > 3); let num_entries = 10; - let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); - for (i, b) in shared_blobs.iter().enumerate() { - let mut w_b = b.write().unwrap(); - w_b.set_index(i as u64 * gap).unwrap(); - w_b.set_slot(slot).unwrap(); + let mut blobs = make_tiny_test_entries(num_entries).to_blobs(); + for (i, b) in blobs.iter_mut().enumerate() { + b.set_index(i as u64 * gap).unwrap(); + b.set_slot(slot).unwrap(); } - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); db_ledger.write_blobs(&blobs).unwrap(); // Index of the first blob is 0 @@ -661,7 +660,7 @@ mod test { // Write entries let num_entries = 10; - let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); + let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs(); index_blobs( shared_blobs.iter().zip(vec![slot; num_entries].into_iter()), @@ -716,12 +715,19 @@ mod test { try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt"); window[erase_offset].data = erased_data; - let data_blobs: Vec<_> = window[erase_offset..end_index] - .iter() - .map(|slot| slot.data.clone().unwrap()) - .collect(); - let (expected, _) = reconstruct_entries_from_blobs(data_blobs).unwrap(); - assert_eq!(consume_queue, expected); + { + let data_blobs: Vec<_> = window[erase_offset..end_index] + .iter() + .map(|slot| slot.data.clone().unwrap()) + .collect(); + + let locks: Vec<_> = data_blobs.iter().map(|blob| blob.read().unwrap()).collect(); + + let locked_data: Vec<&Blob> = locks.iter().map(|lock| &**lock).collect(); + + let (expected, _) = reconstruct_entries_from_blobs(locked_data).unwrap(); + assert_eq!(consume_queue, expected); + } let erased_coding_l = erased_coding.read().unwrap(); assert_eq!( @@ -750,7 +756,7 @@ mod test { let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let num_entries = 10; let original_entries = make_tiny_test_entries(num_entries); - let shared_blobs = original_entries.clone().to_blobs(); + let shared_blobs = original_entries.clone().to_shared_blobs(); index_blobs( shared_blobs diff --git a/src/entry.rs b/src/entry.rs index 5737ee494b..bd2e7013e3 100644 --- a/src/entry.rs +++ b/src/entry.rs @@ -2,15 +2,17 @@ //! unique ID that is the hash of the Entry before it, plus the hash of the //! transactions within it. Entries cannot be reordered, and its field `num_hashes` //! represents an approximate amount of time since the last Entry was created. -use crate::packet::{SharedBlob, BLOB_DATA_SIZE}; +use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE}; use crate::poh::Poh; use crate::result::Result; use bincode::{deserialize, serialize_into, serialized_size}; use solana_sdk::hash::Hash; use solana_sdk::transaction::Transaction; +use std::borrow::Borrow; use std::io::Cursor; use std::mem::size_of; use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, RwLock}; pub type EntrySender = Sender>; pub type EntryReceiver = Receiver>; @@ -98,17 +100,19 @@ impl Entry { entry } - pub fn to_blob(&self) -> SharedBlob { - let blob = SharedBlob::default(); - { - let mut blob_w = blob.write().unwrap(); - let pos = { - let mut out = Cursor::new(blob_w.data_mut()); - serialize_into(&mut out, &self).expect("failed to serialize output"); - out.position() as usize - }; - blob_w.set_size(pos); - } + pub fn to_shared_blob(&self) -> SharedBlob { + let blob = self.to_blob(); + Arc::new(RwLock::new(blob)) + } + + pub fn to_blob(&self) -> Blob { + let mut blob = Blob::default(); + let pos = { + let mut out = Cursor::new(blob.data_mut()); + serialize_into(&mut out, &self).expect("failed to serialize output"); + out.position() as usize + }; + blob.set_size(pos); blob } @@ -224,15 +228,18 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) - } } -pub fn reconstruct_entries_from_blobs(blobs: Vec) -> Result<(Vec, u64)> { - let mut entries: Vec = Vec::with_capacity(blobs.len()); +pub fn reconstruct_entries_from_blobs(blobs: I) -> Result<(Vec, u64)> +where + I: IntoIterator, + I::Item: Borrow, +{ + let mut entries: Vec = vec![]; let mut num_ticks = 0; - for blob in blobs { + for blob in blobs.into_iter() { let entry: Entry = { - let msg = blob.read().unwrap(); - let msg_size = msg.size()?; - deserialize(&msg.data()[..msg_size]).expect("Error reconstructing entry") + let msg_size = blob.borrow().size()?; + deserialize(&blob.borrow().data()[..msg_size])? }; if entry.is_tick() { diff --git a/src/erasure.rs b/src/erasure.rs index f493df02c4..fb423cf980 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -770,7 +770,7 @@ pub mod test { WINDOW_SIZE ]; let entries = make_tiny_test_entries(num_blobs); - let blobs = entries.to_blobs(); + let blobs = entries.to_shared_blobs(); { // Make some dummy slots diff --git a/src/ledger.rs b/src/ledger.rs index 40f185fa90..8a2176a0f8 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -4,7 +4,7 @@ use crate::entry::Entry; use crate::mint::Mint; -use crate::packet::{SharedBlob, BLOB_DATA_SIZE}; +use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE}; use bincode::{self, deserialize_from, serialize_into, serialized_size}; use chrono::prelude::Utc; use log::Level::Trace; @@ -450,7 +450,8 @@ pub fn read_ledger( pub trait Block { /// Verifies the hashes and counts of a slice of transactions are all consistent. fn verify(&self, start_hash: &Hash) -> bool; - fn to_blobs(&self) -> Vec; + fn to_shared_blobs(&self) -> Vec; + fn to_blobs(&self) -> Vec; fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>; } @@ -477,10 +478,14 @@ impl Block for [Entry] { }) } - fn to_blobs(&self) -> Vec { + fn to_blobs(&self) -> Vec { self.iter().map(|entry| entry.to_blob()).collect() } + fn to_shared_blobs(&self) -> Vec { + self.iter().map(|entry| entry.to_shared_blob()).collect() + } + fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> { self.iter() .flat_map(|entry| { @@ -704,7 +709,7 @@ pub fn make_consecutive_blobs( ) -> Vec { let entries = create_ticks(num_blobs_to_make as usize, start_hash); - let blobs = entries.to_blobs(); + let blobs = entries.to_shared_blobs(); let mut index = start_height; for blob in &blobs { let mut blob = blob.write().unwrap(); @@ -773,7 +778,7 @@ mod tests { } #[test] - fn test_entries_to_blobs() { + fn test_entries_to_shared_blobs() { solana_logger::setup(); let entries = make_test_entries(); diff --git a/src/packet.rs b/src/packet.rs index abfcfed65b..043b8cd18a 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -232,21 +232,18 @@ pub fn to_packets(xs: &[T]) -> Vec { to_packets_chunked(xs, NUM_PACKETS) } -pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { - let blob = SharedBlob::default(); - { - let mut b = blob.write().unwrap(); - let v = serialize(&resp)?; - let len = v.len(); - assert!(len <= BLOB_SIZE); - b.data[..len].copy_from_slice(&v); - b.meta.size = len; - b.meta.set_addr(&rsp_addr); - } - Ok(blob) +pub fn to_blob(resp: T, rsp_addr: SocketAddr) -> Result { + let mut b = Blob::default(); + let v = serialize(&resp)?; + let len = v.len(); + assert!(len <= BLOB_SIZE); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); + Ok(b) } -pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result { +pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result> { let mut blobs = Vec::new(); for (resp, rsp_addr) in rsps { blobs.push(to_blob(resp, rsp_addr)?); @@ -254,6 +251,19 @@ pub fn to_blobs(rsps: Vec<(T, SocketAddr)>) -> Result Ok(blobs) } +pub fn to_shared_blob(resp: T, rsp_addr: SocketAddr) -> Result { + let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?)); + Ok(blob) +} + +pub fn to_shared_blobs(rsps: Vec<(T, SocketAddr)>) -> Result { + let mut blobs = Vec::new(); + for (resp, rsp_addr) in rsps { + blobs.push(to_shared_blob(resp, rsp_addr)?); + } + Ok(blobs) +} + const BLOB_SLOT_END: usize = size_of::(); const BLOB_INDEX_END: usize = BLOB_SLOT_END + size_of::(); const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::(); @@ -302,7 +312,7 @@ impl Blob { Ok(()) } /// sender id, we use this for identifying if its a blob from the leader that we should - /// retransmit. eventually blobs should have a signature that we can use ffor spam filtering + /// retransmit. eventually blobs should have a signature that we can use for spam filtering pub fn id(&self) -> Result { let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?; Ok(e) diff --git a/src/window.rs b/src/window.rs index 24cdfcc5b2..36babc4ebe 100644 --- a/src/window.rs +++ b/src/window.rs @@ -2,14 +2,11 @@ //! use crate::cluster_info::ClusterInfo; use crate::counter::Counter; -use crate::entry::reconstruct_entries_from_blobs; -use crate::entry::Entry; use crate::leader_scheduler::LeaderScheduler; use crate::packet::SharedBlob; use log::Level; use solana_sdk::pubkey::Pubkey; use std::cmp; -use std::mem; use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; @@ -63,18 +60,6 @@ pub trait WindowUtil { fn print(&self, id: &Pubkey, consumed: u64) -> String; - fn process_blob( - &mut self, - id: &Pubkey, - blob: SharedBlob, - pix: u64, - consume_queue: &mut Vec, - consumed: &mut u64, - tick_height: &mut u64, - leader_unknown: bool, - pending_retransmits: &mut bool, - ); - fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool; } @@ -256,110 +241,6 @@ impl WindowUtil for Window { buf.join("") ) } - - /// process a blob: Add blob to the window. If a continuous set of blobs - /// starting from consumed is thereby formed, add that continuous - /// range of blobs to a queue to be sent on to the next stage. - /// - /// * `self` - the window we're operating on - /// * `id` - this node's id - /// * `blob` - the blob to be processed into the window and rebroadcast - /// * `pix` - the index of the blob, corresponds to - /// the entry height of this blob - /// * `consume_queue` - output, blobs to be rebroadcast are placed here - /// * `consumed` - input/output, the entry-height to which this - /// node has populated and rebroadcast entries - fn process_blob( - &mut self, - id: &Pubkey, - blob: SharedBlob, - pix: u64, - consume_queue: &mut Vec, - consumed: &mut u64, - tick_height: &mut u64, - leader_unknown: bool, - pending_retransmits: &mut bool, - ) { - let w = (pix % self.window_size()) as usize; - - let is_coding = blob.read().unwrap().is_coding(); - - // insert a newly received blob into a window slot, clearing out and recycling any previous - // blob unless the incoming blob is a duplicate (based on idx) - // returns whether the incoming is a duplicate blob - fn insert_blob_is_dup( - id: &Pubkey, - blob: SharedBlob, - pix: u64, - window_slot: &mut Option, - c_or_d: &str, - ) -> bool { - if let Some(old) = mem::replace(window_slot, Some(blob)) { - let is_dup = old.read().unwrap().index().unwrap() == pix; - trace!( - "{}: occupied {} window slot {:}, is_dup: {}", - id, - c_or_d, - pix, - is_dup - ); - is_dup - } else { - trace!("{}: empty {} window slot {:}", id, c_or_d, pix); - false - } - } - - // insert the new blob into the window, overwrite and recycle old (or duplicate) entry - let is_duplicate = if is_coding { - insert_blob_is_dup(id, blob, pix, &mut self[w].coding, "coding") - } else { - insert_blob_is_dup(id, blob, pix, &mut self[w].data, "data") - }; - - if is_duplicate { - return; - } - - self[w].leader_unknown = leader_unknown; - *pending_retransmits = true; - - // push all contiguous blobs into consumed queue, increment consumed - loop { - let k = (*consumed % self.window_size()) as usize; - trace!("{}: k: {} consumed: {}", id, k, *consumed,); - - let k_data_blob; - let k_data_slot = &mut self[k].data; - if let Some(blob) = k_data_slot { - if blob.read().unwrap().index().unwrap() < *consumed { - // window wrap-around, end of received - break; - } - k_data_blob = (*blob).clone(); - } else { - // self[k].data is None, end of received - break; - } - - // Check that we can get the entries from this blob - match reconstruct_entries_from_blobs(vec![k_data_blob]) { - Ok((entries, num_ticks)) => { - consume_queue.extend(entries); - *tick_height += num_ticks; - } - Err(_) => { - // If the blob can't be deserialized, then remove it from the - // window and exit. *k_data_slot cannot be None at this point, - // so it's safe to unwrap. - k_data_slot.take(); - break; - } - } - - *consumed += 1; - } - } } fn calculate_max_repair( diff --git a/tests/multinode.rs b/tests/multinode.rs index a2ebb7d4a1..f2a2b01c66 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -1609,9 +1609,10 @@ fn test_broadcast_last_tick() { break; } } - let actual_last_tick = &reconstruct_entries_from_blobs(vec![last_tick_blob]) - .expect("Expected to be able to reconstruct entries from blob") - .0[0]; + let actual_last_tick = + &reconstruct_entries_from_blobs(vec![&*last_tick_blob.read().unwrap()]) + .expect("Expected to be able to reconstruct entries from blob") + .0[0]; assert_eq!(actual_last_tick, &expected_last_tick); }