Reintroduce leader_id to blobs (#2986)

This commit is contained in:
Sagar Dhawan 2019-02-27 13:37:08 -08:00 committed by GitHub
parent e45559a1a7
commit 3a20a20807
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 67 additions and 90 deletions

View File

@ -1253,7 +1253,7 @@ pub fn create_new_ledger(ledger_path: &str, genesis_block: &GenesisBlock) -> Res
Ok(entries.last().unwrap().id)
}
pub fn genesis<'a, I>(ledger_path: &str, entries: I) -> Result<()>
pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()>
where
I: IntoIterator<Item = &'a Entry>,
{
@ -1267,6 +1267,7 @@ where
let mut b = entry.borrow().to_blob();
b.set_index(idx as u64);
b.forward(true);
b.set_id(&keypair.pubkey());
b.set_slot(0);
b
})
@ -1486,7 +1487,7 @@ pub mod tests {
fn test_read_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_shared_blobs();
let slot = 0;
index_blobs(&shared_blobs, &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -1850,7 +1851,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{
genesis(&ledger_path, &entries).unwrap();
genesis(&ledger_path, &Keypair::new(), &entries).unwrap();
let ledger = Blocktree::open(&ledger_path).expect("open failed");
@ -1868,7 +1869,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{
// put entries except last 2 into ledger
genesis(&ledger_path, &entries[..entries.len() - 2]).unwrap();
genesis(&ledger_path, &Keypair::new(), &entries[..entries.len() - 2]).unwrap();
let ledger = Blocktree::open(&ledger_path).expect("open failed");

View File

@ -74,7 +74,7 @@ impl Broadcast {
.collect();
// TODO: blob_index should be slot-relative...
index_blobs(&blobs, &mut self.blob_index, slot_height);
index_blobs(&blobs, &self.id, &mut self.blob_index, slot_height);
let parent = {
if slot_height == 0 {
0

View File

@ -161,7 +161,7 @@ mod tests {
use bs58;
// golden needs to be updated if blob stuff changes....
let golden = Hash::new(
&bs58::decode("3hY6c5V5R6ho3k5KBYkDg2nZTtkuBpvu9n421nGntHrg")
&bs58::decode("BCNVsE19CCpsvGseZTCEEM1qSiX1ridku2w155VveqEu")
.into_vec()
.unwrap(),
);

View File

@ -1,47 +1,26 @@
//! Set of functions for emulating windowing functions from a database ledger implementation
use crate::bank_forks::BankForks;
use crate::blocktree::*;
#[cfg(feature = "erasure")]
use crate::erasure;
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::{SharedBlob, BLOB_HEADER_SIZE};
use crate::result::Result;
use crate::streamer::BlobSender;
use solana_metrics::counter::Counter;
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
pub const MAX_REPAIR_LENGTH: usize = 128;
pub fn retransmit_blobs(
dq: &[SharedBlob],
bank_forks: &Arc<RwLock<BankForks>>,
retransmit: &BlobSender,
id: &Pubkey,
) -> Result<()> {
pub fn retransmit_blobs(dq: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
let leader_scheduler = LeaderScheduler::default();
for b in dq {
let bank = bank_forks.read().unwrap().working_bank();
// Don't add blobs generated by this node to the retransmit queue
let slot = b.read().unwrap().slot();
if leader_scheduler.slot_leader_at(slot, &bank) != *id {
if b.read().unwrap().id() != *id {
retransmit_queue.push(b.clone());
}
}
//todo maybe move this to where retransmit is actually happening
submit(
influxdb::Point::new("retransmit-queue")
.add_field(
"count",
influxdb::Value::Integer(retransmit_queue.len() as i64),
)
.to_owned(),
);
if !retransmit_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
@ -50,11 +29,7 @@ pub fn retransmit_blobs(
}
/// Process a blob: Add blob to the ledger window.
pub fn process_blob(
bank_forks: &Arc<RwLock<BankForks>>,
blocktree: &Arc<Blocktree>,
blob: &SharedBlob,
) -> Result<()> {
pub fn process_blob(blocktree: &Arc<Blocktree>, blob: &SharedBlob) -> Result<()> {
let is_coding = blob.read().unwrap().is_coding();
// Check if the blob is in the range of our known leaders. If not, we return.
@ -62,8 +37,6 @@ pub fn process_blob(
let r_blob = blob.read().unwrap();
(r_blob.slot(), r_blob.index())
};
let bank = bank_forks.read().unwrap().working_bank();
let _leader = LeaderScheduler::default().slot_leader_at(slot, &bank);
// TODO: Once the original leader signature is added to the blob, make sure that
// the blob was originally generated by the expected leader for this slot
@ -124,14 +97,13 @@ mod test {
use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder, PacketReceiver};
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::io;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
fn get_msgs(r: PacketReceiver, num: &mut usize) {
@ -448,7 +420,7 @@ mod test {
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_shared_blobs();
index_blobs(&shared_blobs, &mut 0, slot);
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, slot);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
@ -534,23 +506,14 @@ mod test {
fn test_process_blob() {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Arc::new(Blocktree::open(&blocktree_path).unwrap());
let (genesis_block, _) = GenesisBlock::new(100);
let bank0 = Bank::new(&genesis_block);
let bank_id = 0;
let bank_forks = BankForks::new(bank_id, bank0);
let num_entries = 10;
let original_entries = make_tiny_test_entries(num_entries);
let shared_blobs = original_entries.clone().to_shared_blobs();
index_blobs(&shared_blobs, &mut 0, 0);
let bank_forks = Arc::new(RwLock::new(bank_forks));
index_blobs(&shared_blobs, &Keypair::new().pubkey(), &mut 0, 0);
for blob in shared_blobs.iter().rev() {
process_blob(&bank_forks, &blocktree, blob)
.expect("Expect successful processing of blob");
process_blob(&blocktree, blob).expect("Expect successful processing of blob");
}
assert_eq!(

View File

@ -461,6 +461,7 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec<Entry> {
#[cfg(test)]
pub fn make_consecutive_blobs(
id: &Pubkey,
num_blobs_to_make: u64,
start_height: u64,
start_hash: Hash,
@ -473,6 +474,7 @@ pub fn make_consecutive_blobs(
for blob in &blobs {
let mut blob = blob.write().unwrap();
blob.set_index(index);
blob.set_id(id);
blob.forward(true);
blob.meta.set_addr(addr);
index += 1;

View File

@ -329,6 +329,7 @@ impl CodingGenerator {
for data_blob in &data_locks[NUM_DATA - NUM_CODING..NUM_DATA] {
let index = data_blob.index();
let slot = data_blob.slot();
let id = data_blob.id();
let should_forward = data_blob.should_forward();
let coding_blob = SharedBlob::default();
@ -336,6 +337,7 @@ impl CodingGenerator {
let mut coding_blob = coding_blob.write().unwrap();
coding_blob.set_index(index);
coding_blob.set_slot(slot);
coding_blob.set_id(&id);
coding_blob.forward(should_forward);
coding_blob.set_size(max_data_size);
coding_blob.set_coding();
@ -509,6 +511,7 @@ pub mod test {
use crate::window::WindowSlot;
use rand::{thread_rng, Rng};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
#[test]
@ -761,6 +764,7 @@ pub mod test {
let index = data_rl.index();
let slot = data_rl.slot();
let id = data_rl.id();
let should_forward = data_rl.should_forward();
trace!(
@ -771,6 +775,7 @@ pub mod test {
);
coding_wl.set_index(index);
coding_wl.set_slot(slot);
coding_wl.set_id(&id);
coding_wl.forward(should_forward);
}
coding_wl.set_size(max_data_size);
@ -889,7 +894,7 @@ pub mod test {
}
// Make some dummy slots
index_blobs(&blobs, &mut (offset as u64), slot);
index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), slot);
for b in blobs {
let idx = b.read().unwrap().index() as usize % WINDOW_SIZE;
@ -902,7 +907,7 @@ pub mod test {
fn generate_test_blobs(offset: usize, num_blobs: usize) -> Vec<SharedBlob> {
let blobs = make_tiny_test_entries(num_blobs).to_shared_blobs();
index_blobs(&blobs, &mut (offset as u64), 0);
index_blobs(&blobs, &Keypair::new().pubkey(), &mut (offset as u64), 0);
blobs
}

View File

@ -722,11 +722,16 @@ mod tests {
let tvu_address = &validator_info.tvu;
let msgs =
make_consecutive_blobs(blobs_to_send, ledger_initial_len, last_id, &tvu_address)
.into_iter()
.rev()
.collect();
let msgs = make_consecutive_blobs(
&leader_id,
blobs_to_send,
ledger_initial_len,
last_id,
&tvu_address,
)
.into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};

View File

@ -6,6 +6,7 @@ use byteorder::{ByteOrder, LittleEndian};
use serde::Serialize;
use solana_metrics::counter::Counter;
pub use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::fmt;
use std::io;
@ -278,7 +279,8 @@ macro_rules! range {
const PARENT_RANGE: std::ops::Range<usize> = range!(0, u64);
const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const FORWARD_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, bool);
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
const FORWARD_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, bool);
const FLAGS_RANGE: std::ops::Range<usize> = range!(FORWARD_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
@ -323,6 +325,16 @@ impl Blob {
LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix);
}
/// sender id, we use this for identifying if its a blob from the leader that we should
/// retransmit. eventually blobs should have a signature that we can use for spam filtering
pub fn id(&self) -> Pubkey {
Pubkey::new(&self.data[ID_RANGE])
}
pub fn set_id(&mut self, id: &Pubkey) {
self.data[ID_RANGE].copy_from_slice(id.as_ref())
}
/// Used to determine whether or not this blob should be forwarded in retransmit
/// A bool is used here instead of a flag because this item is not intended to be signed when
/// blob signatures are introduced
@ -451,13 +463,14 @@ impl Blob {
}
}
pub fn index_blobs(blobs: &[SharedBlob], blob_index: &mut u64, slot: u64) {
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: &mut u64, slot: u64) {
// enumerate all the blobs, those are the indices
for blob in blobs.iter() {
let mut blob = blob.write().unwrap();
blob.set_index(*blob_index);
blob.set_slot(slot);
blob.set_id(id);
blob.forward(true);
*blob_index += 1;
}

View File

@ -138,7 +138,7 @@ impl Replicator {
let genesis_block =
GenesisBlock::load(ledger_path).expect("Expected to successfully open genesis block");
let (bank_forks, _bank_forks_info) =
let (_bank_forks, _bank_forks_info) =
blocktree_processor::process_blocktree(&genesis_block, &blocktree, None)
.expect("process_blocktree failed");
@ -183,7 +183,6 @@ impl Replicator {
blob_fetch_receiver,
retransmit_sender,
repair_socket,
Arc::new(RwLock::new(bank_forks)),
exit.clone(),
);

View File

@ -128,7 +128,6 @@ impl RetransmitStage {
fetch_stage_receiver,
retransmit_sender,
repair_socket,
bank_forks.clone(),
exit,
);

View File

@ -1,6 +1,5 @@
//! The `window_service` provides a thread for maintaining a window (tail of the ledger).
//!
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::db_window::*;
@ -30,7 +29,6 @@ pub enum WindowServiceReturnType {
fn recv_window(
blocktree: &Arc<Blocktree>,
id: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
r: &BlobReceiver,
retransmit: &BlobSender,
) -> Result<()> {
@ -49,7 +47,7 @@ fn recv_window(
.to_owned(),
);
retransmit_blobs(&dq, bank_forks, retransmit, id)?;
retransmit_blobs(&dq, retransmit, id)?;
//send a contiguous set of blocks
trace!("{} num blobs received: {}", id, dq.len());
@ -62,7 +60,7 @@ fn recv_window(
trace!("{} window pix: {} size: {}", id, pix, meta_size);
let _ = process_blob(bank_forks, blocktree, &b);
let _ = process_blob(blocktree, &b);
}
trace!(
@ -104,7 +102,6 @@ impl WindowService {
r: BlobReceiver,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
bank_forks: Arc<RwLock<BankForks>>,
exit: Arc<AtomicBool>,
) -> WindowService {
let exit_ = exit.clone();
@ -124,7 +121,7 @@ impl WindowService {
if exit.load(Ordering::Relaxed) {
break;
}
if let Err(e) = recv_window(&blocktree, &id, &bank_forks, &r, &retransmit) {
if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
@ -156,7 +153,6 @@ impl Service for WindowService {
#[cfg(test)]
mod test {
use crate::bank_forks::BankForks;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, Node};
@ -164,8 +160,6 @@ mod test {
use crate::service::Service;
use crate::streamer::{blob_receiver, responder};
use crate::window_service::WindowService;
use solana_runtime::bank::Bank;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
@ -195,17 +189,12 @@ mod test {
let blocktree = Arc::new(
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
);
let (genesis_block, _) = GenesisBlock::new(100);
let bank0 = Bank::new(&genesis_block);
let bank_id = 0;
let bank_forks = BankForks::new(bank_id, bank0);
let t_window = WindowService::new(
blocktree,
subs,
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
Arc::new(RwLock::new(bank_forks)),
exit.clone(),
);
let t_responder = {
@ -216,11 +205,16 @@ mod test {
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let num_blobs_to_make = 10;
let gossip_address = &leader_node.info.gossip;
let msgs =
make_consecutive_blobs(num_blobs_to_make, 0, Hash::default(), &gossip_address)
.into_iter()
.rev()
.collect();;
let msgs = make_consecutive_blobs(
&me_id,
num_blobs_to_make,
0,
Hash::default(),
&gossip_address,
)
.into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};
@ -267,17 +261,12 @@ mod test {
let blocktree = Arc::new(
Blocktree::open(&blocktree_path).expect("Expected to be able to open database ledger"),
);
let (genesis_block, _) = GenesisBlock::new(100);
let bank0 = Bank::new(&genesis_block);
let bank_id = 0;
let bank_forks = BankForks::new(bank_id, bank0);
let t_window = WindowService::new(
blocktree,
subs.clone(),
r_reader,
s_retransmit,
Arc::new(leader_node.sockets.repair),
Arc::new(RwLock::new(bank_forks)),
exit.clone(),
);
let t_responder = {
@ -286,7 +275,8 @@ mod test {
leader_node.sockets.tvu.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new();
let blobs = make_consecutive_blobs(14u64, 0, Hash::default(), &leader_node.info.gossip);
let blobs =
make_consecutive_blobs(&me_id, 14u64, 0, Hash::default(), &leader_node.info.gossip);
for v in 0..10 {
let i = 9 - v;

View File

@ -159,7 +159,7 @@ fn test_replay() {
let entries = vec![entry0, entry_tick0, entry_tick1, entry1, entry_tick2];
let blobs = entries.to_shared_blobs();
index_blobs(&blobs, &mut blob_idx, 0);
index_blobs(&blobs, &leader.info.id, &mut blob_idx, 0);
blobs
.iter()
.for_each(|b| b.write().unwrap().meta.set_addr(&tvu_addr));