Add genesis blockhash to blobs (#3953)

This commit is contained in:
Sagar Dhawan 2019-04-23 16:24:44 -07:00 committed by GitHub
parent 4e7e5ace9d
commit 0cbac26591
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 71 additions and 8 deletions

View File

@ -4,7 +4,7 @@ use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError, NEIGHBORHOOD_SIZE}; use crate::cluster_info::{ClusterInfo, ClusterInfoError, NEIGHBORHOOD_SIZE};
use crate::entry::{EntrySender, EntrySlice}; use crate::entry::{EntrySender, EntrySlice};
use crate::erasure::CodingGenerator; use crate::erasure::CodingGenerator;
use crate::packet::index_blobs; use crate::packet::index_blobs_with_genesis;
use crate::poh_recorder::WorkingBankEntries; use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use crate::service::Service; use crate::service::Service;
@ -12,6 +12,7 @@ use crate::staking_utils;
use rayon::prelude::*; use rayon::prelude::*;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_metrics::{influxdb, submit}; use solana_metrics::{influxdb, submit};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -39,6 +40,7 @@ impl Broadcast {
sock: &UdpSocket, sock: &UdpSocket,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: &EntrySender, storage_entry_sender: &EntrySender,
genesis_blockhash: &Hash,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let (mut bank, entries) = receiver.recv_timeout(timer)?; let (mut bank, entries) = receiver.recv_timeout(timer)?;
@ -100,9 +102,10 @@ impl Broadcast {
.map(|meta| meta.consumed) .map(|meta| meta.consumed)
.unwrap_or(0); .unwrap_or(0);
index_blobs( index_blobs_with_genesis(
&blobs, &blobs,
&self.id, &self.id,
genesis_blockhash,
blob_index, blob_index,
bank.slot(), bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()), bank.parent().map_or(0, |parent| parent.slot()),
@ -184,6 +187,7 @@ impl BroadcastStage {
receiver: &Receiver<WorkingBankEntries>, receiver: &Receiver<WorkingBankEntries>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender, storage_entry_sender: EntrySender,
genesis_blockhash: &Hash,
) -> BroadcastStageReturnType { ) -> BroadcastStageReturnType {
let me = cluster_info.read().unwrap().my_data().clone(); let me = cluster_info.read().unwrap().my_data().clone();
let coding_generator = CodingGenerator::default(); let coding_generator = CodingGenerator::default();
@ -200,6 +204,7 @@ impl BroadcastStage {
sock, sock,
blocktree, blocktree,
&storage_entry_sender, &storage_entry_sender,
genesis_blockhash,
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
@ -239,9 +244,11 @@ impl BroadcastStage {
exit_sender: &Arc<AtomicBool>, exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>, blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender, storage_entry_sender: EntrySender,
genesis_blockhash: &Hash,
) -> Self { ) -> Self {
let blocktree = blocktree.clone(); let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone(); let exit_sender = exit_sender.clone();
let genesis_blockhash = *genesis_blockhash;
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
.spawn(move || { .spawn(move || {
@ -252,6 +259,7 @@ impl BroadcastStage {
&receiver, &receiver,
&blocktree, &blocktree,
storage_entry_sender, storage_entry_sender,
&genesis_blockhash,
) )
}) })
.unwrap(); .unwrap();
@ -325,6 +333,7 @@ mod test {
&exit_sender, &exit_sender,
&blocktree, &blocktree,
storage_sender, storage_sender,
&Hash::default(),
); );
MockBroadcastStage { MockBroadcastStage {

View File

@ -159,7 +159,7 @@ mod tests {
use bs58; use bs58;
// golden needs to be updated if blob stuff changes.... // golden needs to be updated if blob stuff changes....
let golden = Hash::new( let golden = Hash::new(
&bs58::decode("GD6xs6Loh9gci6b6P8FVVJ1c1whCqxDzaqBrQkpcxowA") &bs58::decode("5Pz5KQyNht2nqkJhVd8F9zTFxzoDvbQSzaxQbtCPiyCo")
.into_vec() .into_vec()
.unwrap(), .unwrap(),
); );

View File

@ -10,6 +10,7 @@ use crate::entry::next_entry_mut;
use crate::entry::Entry; use crate::entry::Entry;
use crate::gossip_service::{discover_nodes, GossipService}; use crate::gossip_service::{discover_nodes, GossipService};
use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::PohRecorder;
use crate::poh_service::{PohService, PohServiceConfig}; use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc::JsonRpcConfig; use crate::rpc::JsonRpcConfig;
use crate::rpc_pubsub_service::PubSubService; use crate::rpc_pubsub_service::PubSubService;
@ -19,8 +20,6 @@ use crate::service::Service;
use crate::storage_stage::StorageState; use crate::storage_stage::StorageState;
use crate::tpu::Tpu; use crate::tpu::Tpu;
use crate::tvu::{Sockets, Tvu}; use crate::tvu::{Sockets, Tvu};
use crate::poh_recorder::PohRecorder;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
@ -102,6 +101,7 @@ impl Fullnode {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let bank_info = &bank_forks_info[0]; let bank_info = &bank_forks_info[0];
let bank = bank_forks[bank_info.bank_slot].clone(); let bank = bank_forks[bank_info.bank_slot].clone();
let genesis_blockhash = bank.last_blockhash();
info!( info!(
"starting PoH... {} {}", "starting PoH... {} {}",
@ -237,6 +237,7 @@ impl Fullnode {
receiver, receiver,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
&genesis_blockhash,
); );
if config.sigverify_disabled { if config.sigverify_disabled {
@ -256,6 +257,7 @@ impl Fullnode {
sender, sender,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
&genesis_blockhash,
); );
inc_new_counter_info!("fullnode-new", 1); inc_new_counter_info!("fullnode-new", 1);

View File

@ -5,6 +5,7 @@ use bincode;
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use serde::Serialize; use serde::Serialize;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_sdk::hash::Hash;
pub use solana_sdk::packet::PACKET_DATA_SIZE; pub use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow; use std::borrow::Borrow;
@ -354,7 +355,8 @@ const SLOT_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64); const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey); const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
const FORWARDED_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, bool); const FORWARDED_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, bool);
const FLAGS_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u32); const GENESIS_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, Hash);
const FLAGS_RANGE: std::ops::Range<usize> = range!(GENESIS_RANGE.end, u32);
const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64); const SIZE_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, u64);
macro_rules! align { macro_rules! align {
@ -435,6 +437,14 @@ impl Blob {
self.data[FORWARDED_RANGE][0] = u8::from(forward) self.data[FORWARDED_RANGE][0] = u8::from(forward)
} }
pub fn set_genesis_blockhash(&mut self, blockhash: &Hash) {
self.data[GENESIS_RANGE].copy_from_slice(blockhash.as_ref())
}
pub fn genesis_blockhash(&self) -> Hash {
Hash::new(&self.data[GENESIS_RANGE])
}
pub fn flags(&self) -> u32 { pub fn flags(&self) -> u32 {
LittleEndian::read_u32(&self.data[FLAGS_RANGE]) LittleEndian::read_u32(&self.data[FLAGS_RANGE])
} }
@ -573,12 +583,24 @@ impl Blob {
} }
} }
pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) { pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, blob_index: u64, slot: u64, parent: u64) {
index_blobs_with_genesis(blobs, id, &Hash::default(), blob_index, slot, parent)
}
pub fn index_blobs_with_genesis(
blobs: &[SharedBlob],
id: &Pubkey,
genesis: &Hash,
mut blob_index: u64,
slot: u64,
parent: u64,
) {
// enumerate all the blobs, those are the indices // enumerate all the blobs, those are the indices
for blob in blobs.iter() { for blob in blobs.iter() {
let mut blob = blob.write().unwrap(); let mut blob = blob.write().unwrap();
blob.set_index(blob_index); blob.set_index(blob_index);
blob.set_genesis_blockhash(genesis);
blob.set_slot(slot); blob.set_slot(slot);
blob.set_parent(parent); blob.set_parent(parent);
blob.set_id(id); blob.set_id(id);
@ -837,4 +859,14 @@ mod tests {
assert!(p1 != p2); assert!(p1 != p2);
} }
#[test]
fn test_blob_genesis_blockhash() {
let mut blob = Blob::default();
assert_eq!(blob.genesis_blockhash(), Hash::default());
let hash = Hash::new(&Pubkey::new_rand().as_ref());
blob.set_genesis_blockhash(&hash);
assert_eq!(blob.genesis_blockhash(), hash);
}
} }

View File

@ -243,6 +243,7 @@ impl Replicator {
repair_socket, repair_socket,
&exit, &exit,
Some(repair_slot_range), Some(repair_slot_range),
&Hash::default(),
); );
let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);

View File

@ -13,6 +13,7 @@ use crate::streamer::BlobReceiver;
use crate::window_service::WindowService; use crate::window_service::WindowService;
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_metrics::{influxdb, submit}; use solana_metrics::{influxdb, submit};
use solana_sdk::hash::Hash;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
@ -120,6 +121,7 @@ impl RetransmitStage {
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver, fetch_stage_receiver: BlobReceiver,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
genesis_blockhash: &Hash,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -140,6 +142,7 @@ impl RetransmitStage {
repair_socket, repair_socket,
exit, exit,
None, None,
genesis_blockhash,
); );
let thread_hdls = vec![t_retransmit]; let thread_hdls = vec![t_retransmit];

View File

@ -12,6 +12,7 @@ use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service; use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage; use crate::sigverify_stage::SigVerifyStage;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
@ -42,6 +43,7 @@ impl Tpu {
storage_entry_sender: EntrySender, storage_entry_sender: EntrySender,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
genesis_blockhash: &Hash,
) -> Self { ) -> Self {
cluster_info.write().unwrap().set_leader(id); cluster_info.write().unwrap().set_leader(id);
@ -82,6 +84,7 @@ impl Tpu {
&exit, &exit,
blocktree, blocktree,
storage_entry_sender, storage_entry_sender,
genesis_blockhash,
); );
Self { Self {

View File

@ -26,6 +26,7 @@ use crate::retransmit_stage::RetransmitStage;
use crate::rpc_subscriptions::RpcSubscriptions; use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service; use crate::service::Service;
use crate::storage_stage::{StorageStage, StorageState}; use crate::storage_stage::{StorageStage, StorageState};
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::UdpSocket; use std::net::UdpSocket;
@ -74,6 +75,7 @@ impl Tvu {
storage_entry_receiver: EntryReceiver, storage_entry_receiver: EntryReceiver,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
genesis_blockhash: &Hash,
) -> Self ) -> Self
where where
T: 'static + KeypairUtil + Sync + Send, T: 'static + KeypairUtil + Sync + Send,
@ -110,6 +112,7 @@ impl Tvu {
repair_socket, repair_socket,
blob_fetch_receiver, blob_fetch_receiver,
&exit, &exit,
genesis_blockhash,
); );
let (replay_stage, slot_full_receiver) = ReplayStage::new( let (replay_stage, slot_full_receiver) = ReplayStage::new(
@ -243,6 +246,7 @@ pub mod tests {
storage_entry_receiver, storage_entry_receiver,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
&Hash::default(),
); );
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
tvu.join().unwrap(); tvu.join().unwrap();

View File

@ -13,6 +13,7 @@ use crate::service::Service;
use crate::streamer::{BlobReceiver, BlobSender}; use crate::streamer::{BlobReceiver, BlobSender};
use solana_metrics::counter::Counter; use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank; use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket; use std::net::UdpSocket;
@ -107,6 +108,7 @@ fn recv_window(
my_id: &Pubkey, my_id: &Pubkey,
r: &BlobReceiver, r: &BlobReceiver,
retransmit: &BlobSender, retransmit: &BlobSender,
genesis_blockhash: &Hash,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut blobs = r.recv_timeout(timer)?; let mut blobs = r.recv_timeout(timer)?;
@ -125,7 +127,7 @@ fn recv_window(
.as_ref(), .as_ref(),
leader_schedule_cache, leader_schedule_cache,
my_id, my_id,
) ) && blob.read().unwrap().genesis_blockhash() == *genesis_blockhash
}); });
retransmit_blobs(&blobs, retransmit, my_id)?; retransmit_blobs(&blobs, retransmit, my_id)?;
@ -166,6 +168,7 @@ pub struct WindowService {
} }
impl WindowService { impl WindowService {
#[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>, leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
@ -176,6 +179,7 @@ impl WindowService {
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
repair_slot_range: Option<RepairSlotRange>, repair_slot_range: Option<RepairSlotRange>,
genesis_blockhash: &Hash,
) -> WindowService { ) -> WindowService {
let repair_service = RepairService::new( let repair_service = RepairService::new(
blocktree.clone(), blocktree.clone(),
@ -187,6 +191,7 @@ impl WindowService {
let exit = exit.clone(); let exit = exit.clone();
let bank_forks = bank_forks.clone(); let bank_forks = bank_forks.clone();
let leader_schedule_cache = leader_schedule_cache.clone(); let leader_schedule_cache = leader_schedule_cache.clone();
let hash = *genesis_blockhash;
let t_window = Builder::new() let t_window = Builder::new()
.name("solana-window".to_string()) .name("solana-window".to_string())
.spawn(move || { .spawn(move || {
@ -204,6 +209,7 @@ impl WindowService {
&id, &id,
&r, &r,
&retransmit, &retransmit,
&hash,
) { ) {
match e { match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -357,6 +363,7 @@ mod test {
Arc::new(leader_node.sockets.repair), Arc::new(leader_node.sockets.repair),
&exit, &exit,
None, None,
&Hash::default(),
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
@ -434,6 +441,7 @@ mod test {
Arc::new(leader_node.sockets.repair), Arc::new(leader_node.sockets.repair),
&exit, &exit,
None, None,
&Hash::default(),
); );
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();

View File

@ -129,6 +129,7 @@ fn test_replay() {
storage_receiver, storage_receiver,
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
&solana_sdk::hash::Hash::default(),
); );
let mut mint_ref_balance = starting_mint_balance; let mut mint_ref_balance = starting_mint_balance;