Make reconstruct_entries_from_blobs() support Blobs and borrowed SharedBlobs, make distinction between to_blobs and to_shared_blobs (#2270)

This commit is contained in:
carllin 2018-12-22 19:30:30 -08:00 committed by GitHub
parent 2c9607d5da
commit 58a4905916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 135 additions and 217 deletions

View File

@ -44,7 +44,7 @@ fn setup_read_bench(
entries.extend(make_tiny_test_entries(num_small_blobs as usize));
// Convert the entries to blobs, write the blobs to the ledger
let shared_blobs = entries.to_blobs();
let shared_blobs = entries.to_shared_blobs();
for b in shared_blobs.iter() {
b.write().unwrap().set_slot(slot).unwrap();
}
@ -60,7 +60,7 @@ fn bench_write_small(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path("bench_write_small");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let shared_blobs = entries.to_shared_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
bench_write_blobs(bench, &mut blobs, &ledger_path);
@ -73,7 +73,7 @@ fn bench_write_big(bench: &mut Bencher) {
let ledger_path = get_tmp_ledger_path("bench_write_big");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let shared_blobs = entries.to_blobs();
let shared_blobs = entries.to_shared_blobs();
let mut blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.write().unwrap()).collect();
let mut blobs: Vec<&mut Blob> = blob_locks.iter_mut().map(|b| &mut **b).collect();
bench_write_blobs(bench, &mut blobs, &ledger_path);
@ -147,7 +147,7 @@ fn bench_insert_data_blob_small(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_tiny_test_entries(num_entries);
let mut shared_blobs = entries.to_blobs();
let mut shared_blobs = entries.to_shared_blobs();
shared_blobs.shuffle(&mut thread_rng());
bench.iter(move || {
@ -172,7 +172,7 @@ fn bench_insert_data_blob_big(bench: &mut Bencher) {
DbLedger::open(&ledger_path).expect("Expected to be able to open database ledger");
let num_entries = 32 * 1024;
let entries = make_large_test_entries(num_entries);
let mut shared_blobs = entries.to_blobs();
let mut shared_blobs = entries.to_shared_blobs();
shared_blobs.shuffle(&mut thread_rng());
bench.iter(move || {

View File

@ -72,7 +72,7 @@ fn broadcast(
let blobs: Vec<_> = ventries
.into_par_iter()
.flat_map(|p| p.to_blobs())
.flat_map(|p| p.to_shared_blobs())
.collect();
let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect();

View File

@ -20,7 +20,7 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId};
use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT};
use crate::packet::{to_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::packet::{to_shared_blob, Blob, SharedBlob, BLOB_SIZE};
use crate::result::Result;
use crate::rpc::RPC_PORT;
use crate::streamer::{BlobReceiver, BlobSender};
@ -602,7 +602,7 @@ impl ClusterInfo {
let reqs = obj.write().unwrap().gossip_request();
let blobs = reqs
.into_iter()
.filter_map(|(remote_gossip_addr, req)| to_blob(req, remote_gossip_addr).ok())
.filter_map(|(remote_gossip_addr, req)| to_shared_blob(req, remote_gossip_addr).ok())
.collect();
blob_sender.send(blobs)?;
Ok(())
@ -760,7 +760,7 @@ impl ClusterInfo {
from.gossip = *from_addr;
}
inc_new_counter_info!("cluster_info-pull_request-rsp", len);
to_blob(rsp, from.gossip).ok().into_iter().collect()
to_shared_blob(rsp, from.gossip).ok().into_iter().collect()
}
}
fn handle_pull_response(me: &Arc<RwLock<Self>>, from: Pubkey, data: Vec<CrdsValue>) {
@ -805,13 +805,15 @@ impl ClusterInfo {
};
prune_msg.sign(&me.read().unwrap().keypair);
let rsp = Protocol::PruneMessage(self_id, prune_msg);
to_blob(rsp, ci.gossip).ok()
to_shared_blob(rsp, ci.gossip).ok()
})
.into_iter()
.collect();
let mut blobs: Vec<_> = pushes
.into_iter()
.filter_map(|(remote_gossip_addr, req)| to_blob(req, remote_gossip_addr).ok())
.filter_map(|(remote_gossip_addr, req)| {
to_shared_blob(req, remote_gossip_addr).ok()
})
.collect();
rsp.append(&mut blobs);
rsp

View File

@ -354,9 +354,10 @@ impl DbLedger {
pub fn write_blobs<'a, I>(&self, blobs: I) -> Result<Vec<Entry>>
where
I: IntoIterator<Item = &'a &'a Blob>,
I: IntoIterator,
I::Item: Borrow<&'a Blob>,
{
let blobs = blobs.into_iter().cloned();
let blobs = blobs.into_iter().map(|b| *b.borrow());
let new_entries = self.insert_data_blobs(blobs)?;
Ok(new_entries)
}
@ -366,17 +367,18 @@ impl DbLedger {
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
let b = entry.borrow().to_blob();
{
let mut w_b = b.write().unwrap();
w_b.set_index(idx as u64).unwrap();
w_b.set_slot(slot).unwrap();
}
b
});
let blobs: Vec<_> = entries
.into_iter()
.enumerate()
.map(|(idx, entry)| {
let mut b = entry.borrow().to_blob();
b.set_index(idx as u64).unwrap();
b.set_slot(slot).unwrap();
b
})
.collect();
self.write_shared_blobs(shared_blobs)
self.write_blobs(&blobs)
}
pub fn insert_data_blobs<I>(&self, new_blobs: I) -> Result<Vec<Entry>>
@ -700,15 +702,19 @@ where
let db_ledger = DbLedger::open(ledger_path)?;
// TODO sign these blobs with keypair
let blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
let b = entry.borrow().to_blob();
b.write().unwrap().set_index(idx as u64).unwrap();
b.write().unwrap().set_id(&keypair.pubkey()).unwrap();
b.write().unwrap().set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
b
});
let blobs: Vec<_> = entries
.into_iter()
.enumerate()
.map(|(idx, entry)| {
let mut b = entry.borrow().to_blob();
b.set_index(idx as u64).unwrap();
b.set_id(&keypair.pubkey()).unwrap();
b.set_slot(DEFAULT_SLOT_HEIGHT).unwrap();
b
})
.collect();
db_ledger.write_shared_blobs(blobs)?;
db_ledger.write_blobs(&blobs[..])?;
Ok(())
}
@ -768,7 +774,7 @@ mod tests {
#[test]
fn test_get_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_blobs();
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = DEFAULT_SLOT_HEIGHT;
index_blobs(
shared_blobs.iter().zip(vec![slot; 10].into_iter()),
@ -838,7 +844,7 @@ mod tests {
#[test]
fn test_insert_data_blobs_basic() {
let entries = make_tiny_test_entries(2);
let shared_blobs = entries.to_blobs();
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
@ -882,7 +888,7 @@ mod tests {
fn test_insert_data_blobs_multiple() {
let num_blobs = 10;
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_blobs();
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
}
@ -919,7 +925,7 @@ mod tests {
fn test_insert_data_blobs_slots() {
let num_blobs = 10;
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_blobs();
let shared_blobs = entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64).unwrap();
}
@ -966,7 +972,7 @@ mod tests {
// Write entries
let num_entries = 8;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
@ -1009,7 +1015,7 @@ mod tests {
// Write entries
let num_entries = 20 as u64;
let original_entries = make_tiny_test_entries(num_entries as usize);
let shared_blobs = original_entries.clone().to_blobs();
let shared_blobs = original_entries.clone().to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64).unwrap();
@ -1055,7 +1061,7 @@ mod tests {
.flat_map(|e| vec![e; num_duplicates])
.collect();
let shared_blobs = original_entries.clone().to_blobs();
let shared_blobs = original_entries.clone().to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let index = (i / 2) as u64;
let mut w_b = b.write().unwrap();
@ -1106,7 +1112,7 @@ mod tests {
// Write entries
let num_entries = 20 as u64;
let original_entries = make_tiny_test_entries(num_entries as usize);
let shared_blobs = original_entries.to_blobs();
let shared_blobs = original_entries.to_shared_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64).unwrap();

View File

@ -12,6 +12,7 @@ use crate::streamer::BlobSender;
use log::Level;
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::cmp;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
@ -299,7 +300,7 @@ pub fn process_blob(
.put(&erasure_key, &rblob.data[..BLOB_HEADER_SIZE + size])?;
vec![]
} else {
db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])?
db_ledger.insert_data_blobs(vec![(*blob.read().unwrap()).borrow()])?
};
#[cfg(feature = "erasure")]
@ -402,6 +403,7 @@ mod test {
use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder, PacketReceiver};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::borrow::Borrow;
use std::io;
use std::io::Write;
use std::net::UdpSocket;
@ -532,7 +534,7 @@ mod test {
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 4, 3, 1), empty);
assert_eq!(find_missing_data_indexes(slot, &db_ledger, 1, 2, 0), empty);
let shared_blob = &make_tiny_test_entries(1).to_blobs()[0];
let shared_blob = &make_tiny_test_entries(1).to_shared_blobs()[0];
let first_index = 10;
{
let mut bl = shared_blob.write().unwrap();
@ -542,7 +544,7 @@ mod test {
// Insert one blob at index = first_index
db_ledger
.write_blobs(&vec![&*shared_blob.read().unwrap()])
.write_blobs(&vec![(*shared_blob.read().unwrap()).borrow()])
.unwrap();
// The first blob has index = first_index. Thus, for i < first_index,
@ -575,14 +577,11 @@ mod test {
let gap = 10;
assert!(gap > 3);
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (i, b) in shared_blobs.iter().enumerate() {
let mut w_b = b.write().unwrap();
w_b.set_index(i as u64 * gap).unwrap();
w_b.set_slot(slot).unwrap();
let mut blobs = make_tiny_test_entries(num_entries).to_blobs();
for (i, b) in blobs.iter_mut().enumerate() {
b.set_index(i as u64 * gap).unwrap();
b.set_slot(slot).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
db_ledger.write_blobs(&blobs).unwrap();
// Index of the first blob is 0
@ -661,7 +660,7 @@ mod test {
// Write entries
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
index_blobs(
shared_blobs.iter().zip(vec![slot; num_entries].into_iter()),
@ -716,12 +715,19 @@ mod test {
try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt");
window[erase_offset].data = erased_data;
let data_blobs: Vec<_> = window[erase_offset..end_index]
.iter()
.map(|slot| slot.data.clone().unwrap())
.collect();
let (expected, _) = reconstruct_entries_from_blobs(data_blobs).unwrap();
assert_eq!(consume_queue, expected);
{
let data_blobs: Vec<_> = window[erase_offset..end_index]
.iter()
.map(|slot| slot.data.clone().unwrap())
.collect();
let locks: Vec<_> = data_blobs.iter().map(|blob| blob.read().unwrap()).collect();
let locked_data: Vec<&Blob> = locks.iter().map(|lock| &**lock).collect();
let (expected, _) = reconstruct_entries_from_blobs(locked_data).unwrap();
assert_eq!(consume_queue, expected);
}
let erased_coding_l = erased_coding.read().unwrap();
assert_eq!(
@ -750,7 +756,7 @@ mod test {
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_blobs();
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(
shared_blobs

View File

@ -2,15 +2,17 @@
//! unique ID that is the hash of the Entry before it, plus the hash of the
//! transactions within it. Entries cannot be reordered, and its field `num_hashes`
//! represents an approximate amount of time since the last Entry was created.
use crate::packet::{SharedBlob, BLOB_DATA_SIZE};
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE};
use crate::poh::Poh;
use crate::result::Result;
use bincode::{deserialize, serialize_into, serialized_size};
use solana_sdk::hash::Hash;
use solana_sdk::transaction::Transaction;
use std::borrow::Borrow;
use std::io::Cursor;
use std::mem::size_of;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
pub type EntrySender = Sender<Vec<Entry>>;
pub type EntryReceiver = Receiver<Vec<Entry>>;
@ -98,17 +100,19 @@ impl Entry {
entry
}
pub fn to_blob(&self) -> SharedBlob {
let blob = SharedBlob::default();
{
let mut blob_w = blob.write().unwrap();
let pos = {
let mut out = Cursor::new(blob_w.data_mut());
serialize_into(&mut out, &self).expect("failed to serialize output");
out.position() as usize
};
blob_w.set_size(pos);
}
pub fn to_shared_blob(&self) -> SharedBlob {
let blob = self.to_blob();
Arc::new(RwLock::new(blob))
}
pub fn to_blob(&self) -> Blob {
let mut blob = Blob::default();
let pos = {
let mut out = Cursor::new(blob.data_mut());
serialize_into(&mut out, &self).expect("failed to serialize output");
out.position() as usize
};
blob.set_size(pos);
blob
}
@ -224,15 +228,18 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -
}
}
pub fn reconstruct_entries_from_blobs(blobs: Vec<SharedBlob>) -> Result<(Vec<Entry>, u64)> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
pub fn reconstruct_entries_from_blobs<I>(blobs: I) -> Result<(Vec<Entry>, u64)>
where
I: IntoIterator,
I::Item: Borrow<Blob>,
{
let mut entries: Vec<Entry> = vec![];
let mut num_ticks = 0;
for blob in blobs {
for blob in blobs.into_iter() {
let entry: Entry = {
let msg = blob.read().unwrap();
let msg_size = msg.size()?;
deserialize(&msg.data()[..msg_size]).expect("Error reconstructing entry")
let msg_size = blob.borrow().size()?;
deserialize(&blob.borrow().data()[..msg_size])?
};
if entry.is_tick() {

View File

@ -770,7 +770,7 @@ pub mod test {
WINDOW_SIZE
];
let entries = make_tiny_test_entries(num_blobs);
let blobs = entries.to_blobs();
let blobs = entries.to_shared_blobs();
{
// Make some dummy slots

View File

@ -4,7 +4,7 @@
use crate::entry::Entry;
use crate::mint::Mint;
use crate::packet::{SharedBlob, BLOB_DATA_SIZE};
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE};
use bincode::{self, deserialize_from, serialize_into, serialized_size};
use chrono::prelude::Utc;
use log::Level::Trace;
@ -450,7 +450,8 @@ pub fn read_ledger(
pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool;
fn to_blobs(&self) -> Vec<SharedBlob>;
fn to_shared_blobs(&self) -> Vec<SharedBlob>;
fn to_blobs(&self) -> Vec<Blob>;
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>;
}
@ -477,10 +478,14 @@ impl Block for [Entry] {
})
}
fn to_blobs(&self) -> Vec<SharedBlob> {
fn to_blobs(&self) -> Vec<Blob> {
self.iter().map(|entry| entry.to_blob()).collect()
}
fn to_shared_blobs(&self) -> Vec<SharedBlob> {
self.iter().map(|entry| entry.to_shared_blob()).collect()
}
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> {
self.iter()
.flat_map(|entry| {
@ -704,7 +709,7 @@ pub fn make_consecutive_blobs(
) -> Vec<SharedBlob> {
let entries = create_ticks(num_blobs_to_make as usize, start_hash);
let blobs = entries.to_blobs();
let blobs = entries.to_shared_blobs();
let mut index = start_height;
for blob in &blobs {
let mut blob = blob.write().unwrap();
@ -773,7 +778,7 @@ mod tests {
}
#[test]
fn test_entries_to_blobs() {
fn test_entries_to_shared_blobs() {
solana_logger::setup();
let entries = make_test_entries();

View File

@ -232,21 +232,18 @@ pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<SharedPackets> {
to_packets_chunked(xs, NUM_PACKETS)
}
pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<SharedBlob> {
let blob = SharedBlob::default();
{
let mut b = blob.write().unwrap();
let v = serialize(&resp)?;
let len = v.len();
assert!(len <= BLOB_SIZE);
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
}
Ok(blob)
pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> {
let mut b = Blob::default();
let v = serialize(&resp)?;
let len = v.len();
assert!(len <= BLOB_SIZE);
b.data[..len].copy_from_slice(&v);
b.meta.size = len;
b.meta.set_addr(&rsp_addr);
Ok(b)
}
pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs> {
pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Vec<Blob>> {
let mut blobs = Vec::new();
for (resp, rsp_addr) in rsps {
blobs.push(to_blob(resp, rsp_addr)?);
@ -254,6 +251,19 @@ pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs>
Ok(blobs)
}
pub fn to_shared_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<SharedBlob> {
let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?));
Ok(blob)
}
pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs> {
let mut blobs = Vec::new();
for (resp, rsp_addr) in rsps {
blobs.push(to_shared_blob(resp, rsp_addr)?);
}
Ok(blobs)
}
const BLOB_SLOT_END: usize = size_of::<u64>();
const BLOB_INDEX_END: usize = BLOB_SLOT_END + size_of::<u64>();
const BLOB_ID_END: usize = BLOB_INDEX_END + size_of::<Pubkey>();
@ -302,7 +312,7 @@ impl Blob {
Ok(())
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use ffor spam filtering
/// retransmit. eventually blobs should have a signature that we can use for spam filtering
pub fn id(&self) -> Result<Pubkey> {
let e = deserialize(&self.data[BLOB_INDEX_END..BLOB_ID_END])?;
Ok(e)

View File

@ -2,14 +2,11 @@
//!
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::entry::reconstruct_entries_from_blobs;
use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::SharedBlob;
use log::Level;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
@ -63,18 +60,6 @@ pub trait WindowUtil {
fn print(&self, id: &Pubkey, consumed: u64) -> String;
fn process_blob(
&mut self,
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
consumed: &mut u64,
tick_height: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
);
fn blob_idx_in_window(&self, id: &Pubkey, pix: u64, consumed: u64, received: &mut u64) -> bool;
}
@ -256,110 +241,6 @@ impl WindowUtil for Window {
buf.join("")
)
}
/// process a blob: Add blob to the window. If a continuous set of blobs
/// starting from consumed is thereby formed, add that continuous
/// range of blobs to a queue to be sent on to the next stage.
///
/// * `self` - the window we're operating on
/// * `id` - this node's id
/// * `blob` - the blob to be processed into the window and rebroadcast
/// * `pix` - the index of the blob, corresponds to
/// the entry height of this blob
/// * `consume_queue` - output, blobs to be rebroadcast are placed here
/// * `consumed` - input/output, the entry-height to which this
/// node has populated and rebroadcast entries
fn process_blob(
&mut self,
id: &Pubkey,
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
consumed: &mut u64,
tick_height: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
) {
let w = (pix % self.window_size()) as usize;
let is_coding = blob.read().unwrap().is_coding();
// insert a newly received blob into a window slot, clearing out and recycling any previous
// blob unless the incoming blob is a duplicate (based on idx)
// returns whether the incoming is a duplicate blob
fn insert_blob_is_dup(
id: &Pubkey,
blob: SharedBlob,
pix: u64,
window_slot: &mut Option<SharedBlob>,
c_or_d: &str,
) -> bool {
if let Some(old) = mem::replace(window_slot, Some(blob)) {
let is_dup = old.read().unwrap().index().unwrap() == pix;
trace!(
"{}: occupied {} window slot {:}, is_dup: {}",
id,
c_or_d,
pix,
is_dup
);
is_dup
} else {
trace!("{}: empty {} window slot {:}", id, c_or_d, pix);
false
}
}
// insert the new blob into the window, overwrite and recycle old (or duplicate) entry
let is_duplicate = if is_coding {
insert_blob_is_dup(id, blob, pix, &mut self[w].coding, "coding")
} else {
insert_blob_is_dup(id, blob, pix, &mut self[w].data, "data")
};
if is_duplicate {
return;
}
self[w].leader_unknown = leader_unknown;
*pending_retransmits = true;
// push all contiguous blobs into consumed queue, increment consumed
loop {
let k = (*consumed % self.window_size()) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);
let k_data_blob;
let k_data_slot = &mut self[k].data;
if let Some(blob) = k_data_slot {
if blob.read().unwrap().index().unwrap() < *consumed {
// window wrap-around, end of received
break;
}
k_data_blob = (*blob).clone();
} else {
// self[k].data is None, end of received
break;
}
// Check that we can get the entries from this blob
match reconstruct_entries_from_blobs(vec![k_data_blob]) {
Ok((entries, num_ticks)) => {
consume_queue.extend(entries);
*tick_height += num_ticks;
}
Err(_) => {
// If the blob can't be deserialized, then remove it from the
// window and exit. *k_data_slot cannot be None at this point,
// so it's safe to unwrap.
k_data_slot.take();
break;
}
}
*consumed += 1;
}
}
}
fn calculate_max_repair(

View File

@ -1609,9 +1609,10 @@ fn test_broadcast_last_tick() {
break;
}
}
let actual_last_tick = &reconstruct_entries_from_blobs(vec![last_tick_blob])
.expect("Expected to be able to reconstruct entries from blob")
.0[0];
let actual_last_tick =
&reconstruct_entries_from_blobs(vec![&*last_tick_blob.read().unwrap()])
.expect("Expected to be able to reconstruct entries from blob")
.0[0];
assert_eq!(actual_last_tick, &expected_last_tick);
}