From a7b695c27a4e31d99476f790f3e6903f73629087 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Fri, 3 May 2019 16:27:53 -0700 Subject: [PATCH] Change replicators to slot-based (#4118) --- client/src/rpc_request.rs | 8 +- core/src/broadcast_stage.rs | 25 +- core/src/chacha.rs | 5 +- core/src/chacha_cuda.rs | 9 +- core/src/fullnode.rs | 11 +- core/src/local_cluster.rs | 5 +- core/src/replay_stage.rs | 172 +----------- core/src/replicator.rs | 38 ++- core/src/rpc.rs | 39 +-- core/src/storage_stage.rs | 252 +++++++++--------- core/src/tpu.rs | 3 - core/src/tvu.rs | 19 +- core/tests/tvu.rs | 6 +- programs/storage_api/src/lib.rs | 6 +- programs/storage_api/src/storage_contract.rs | 44 +-- .../storage_api/src/storage_instruction.rs | 31 +-- programs/storage_api/src/storage_processor.rs | 66 ++--- 17 files changed, 255 insertions(+), 484 deletions(-) diff --git a/client/src/rpc_request.rs b/client/src/rpc_request.rs index 00450e53a..730016a6f 100644 --- a/client/src/rpc_request.rs +++ b/client/src/rpc_request.rs @@ -14,8 +14,8 @@ pub enum RpcRequest { GetSignatureStatus, GetSlotLeader, GetStorageBlockhash, - GetStorageEntryHeight, - GetStoragePubkeysForEntryHeight, + GetStorageSlot, + GetStoragePubkeysForSlot, GetTransactionCount, RegisterNode, RequestAirdrop, @@ -40,8 +40,8 @@ impl RpcRequest { RpcRequest::GetSignatureStatus => "getSignatureStatus", RpcRequest::GetSlotLeader => "getSlotLeader", RpcRequest::GetStorageBlockhash => "getStorageBlockhash", - RpcRequest::GetStorageEntryHeight => "getStorageEntryHeight", - RpcRequest::GetStoragePubkeysForEntryHeight => "getStoragePubkeysForEntryHeight", + RpcRequest::GetStorageSlot => "getStorageSlot", + RpcRequest::GetStoragePubkeysForSlot => "getStoragePubkeysForSlot", RpcRequest::GetTransactionCount => "getTransactionCount", RpcRequest::RegisterNode => "registerNode", RpcRequest::RequestAirdrop => "requestAirdrop", diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index e2205632a..d46ef69ca 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -2,7 +2,7 @@ //! use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, ClusterInfoError, NEIGHBORHOOD_SIZE}; -use crate::entry::{EntrySender, EntrySlice}; +use crate::entry::EntrySlice; use crate::erasure::CodingGenerator; use crate::packet::index_blobs_with_genesis; use crate::poh_recorder::WorkingBankEntries; @@ -39,7 +39,6 @@ impl Broadcast { receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, - storage_entry_sender: &EntrySender, genesis_blockhash: &Hash, ) -> Result<()> { let timer = Duration::new(1, 0); @@ -87,11 +86,9 @@ impl Broadcast { let blobs: Vec<_> = ventries .into_par_iter() - .map_with(storage_entry_sender.clone(), |s, p| { + .map(|p| { let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); - let blobs = entries.to_shared_blobs(); - let _ignored = s.send(entries); - blobs + entries.to_shared_blobs() }) .flatten() .collect(); @@ -186,7 +183,6 @@ impl BroadcastStage { cluster_info: &Arc>, receiver: &Receiver, blocktree: &Arc, - storage_entry_sender: EntrySender, genesis_blockhash: &Hash, ) -> BroadcastStageReturnType { let me = cluster_info.read().unwrap().my_data().clone(); @@ -198,14 +194,9 @@ impl BroadcastStage { }; loop { - if let Err(e) = broadcast.run( - &cluster_info, - receiver, - sock, - blocktree, - &storage_entry_sender, - genesis_blockhash, - ) { + if let Err(e) = + broadcast.run(&cluster_info, receiver, sock, blocktree, genesis_blockhash) + { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { return BroadcastStageReturnType::ChannelDisconnected; @@ -243,7 +234,6 @@ impl BroadcastStage { receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, - storage_entry_sender: EntrySender, genesis_blockhash: &Hash, ) -> Self { let blocktree = blocktree.clone(); @@ -258,7 +248,6 @@ impl BroadcastStage { &cluster_info, &receiver, &blocktree, - storage_entry_sender, &genesis_blockhash, ) }) @@ -320,7 +309,6 @@ mod test { let cluster_info = Arc::new(RwLock::new(cluster_info)); let exit_sender = Arc::new(AtomicBool::new(false)); - let (storage_sender, _receiver) = channel(); let (genesis_block, _) = GenesisBlock::new(10_000); let bank = Arc::new(Bank::new(&genesis_block)); @@ -332,7 +320,6 @@ mod test { entry_receiver, &exit_sender, &blocktree, - storage_sender, &Hash::default(), ); diff --git a/core/src/chacha.rs b/core/src/chacha.rs index c8539e867..da70cbc0e 100644 --- a/core/src/chacha.rs +++ b/core/src/chacha.rs @@ -5,7 +5,7 @@ use std::io::{BufWriter, Write}; use std::path::Path; use std::sync::Arc; -use crate::storage_stage::ENTRIES_PER_SEGMENT; +use crate::storage_stage::SLOTS_PER_SEGMENT; pub const CHACHA_BLOCK_SIZE: usize = 64; pub const CHACHA_KEY_SIZE: usize = 32; @@ -50,8 +50,7 @@ pub fn chacha_cbc_encrypt_ledger( let mut entry = slice; loop { - match blocktree.read_blobs_bytes(entry, ENTRIES_PER_SEGMENT - total_entries, &mut buffer, 0) - { + match blocktree.read_blobs_bytes(entry, SLOTS_PER_SEGMENT - total_entries, &mut buffer, 0) { Ok((num_entries, entry_len)) => { debug!( "chacha: encrypting slice: {} num_entries: {} entry_len: {}", diff --git a/core/src/chacha_cuda.rs b/core/src/chacha_cuda.rs index b3310a1b1..44aab635e 100644 --- a/core/src/chacha_cuda.rs +++ b/core/src/chacha_cuda.rs @@ -11,7 +11,7 @@ use std::io; use std::mem::size_of; use std::sync::Arc; -use crate::storage_stage::ENTRIES_PER_SEGMENT; +use crate::storage_stage::SLOTS_PER_SEGMENT; // Encrypt a file with multiple starting IV states, determined by ivecs.len() // @@ -47,8 +47,7 @@ pub fn chacha_cbc_encrypt_file_many_keys( chacha_init_sha_state(int_sha_states.as_mut_ptr(), num_keys as u32); } loop { - match blocktree.read_blobs_bytes(entry, ENTRIES_PER_SEGMENT - total_entries, &mut buffer, 0) - { + match blocktree.read_blobs_bytes(entry, SLOTS_PER_SEGMENT - total_entries, &mut buffer, 0) { Ok((num_entries, entry_len)) => { debug!( "chacha_cuda: encrypting segment: {} num_entries: {} entry_len: {}", @@ -78,9 +77,9 @@ pub fn chacha_cbc_encrypt_file_many_keys( entry += num_entries; debug!( "total entries: {} entry: {} segment: {} entries_per_segment: {}", - total_entries, entry, segment, ENTRIES_PER_SEGMENT + total_entries, entry, segment, SLOTS_PER_SEGMENT ); - if (entry - segment) >= ENTRIES_PER_SEGMENT { + if (entry - segment) >= SLOTS_PER_SEGMENT { break; } } diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index ecfee3e3b..bd91c4f18 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -32,7 +32,7 @@ use solana_vote_api::vote_instruction; use solana_vote_api::vote_state::Vote; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver}; +use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex, RwLock}; use std::thread::Result; @@ -51,7 +51,7 @@ impl Default for FullnodeConfig { // TODO: remove this, temporary parameter to configure // storage amount differently for test configurations // so tests don't take forever to run. - const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 1024; + const NUM_HASHES_FOR_STORAGE_ROTATE: u64 = 128; Self { sigverify_disabled: false, voting_disabled: false, @@ -220,14 +220,10 @@ impl Fullnode { Some(Arc::new(voting_keypair)) }; - // Setup channel for sending entries to storage stage - let (sender, receiver) = channel(); - let tvu = Tvu::new( vote_account, voting_keypair, &bank_forks, - &bank_forks_info, &cluster_info, sockets, blocktree.clone(), @@ -237,8 +233,6 @@ impl Fullnode { ledger_signal_receiver, &subscriptions, &poh_recorder, - sender.clone(), - receiver, &leader_schedule_cache, &exit, &genesis_blockhash, @@ -258,7 +252,6 @@ impl Fullnode { node.sockets.broadcast, config.sigverify_disabled, &blocktree, - sender, &exit, &genesis_blockhash, ); diff --git a/core/src/local_cluster.rs b/core/src/local_cluster.rs index f9dd8c1ae..3a14c131f 100644 --- a/core/src/local_cluster.rs +++ b/core/src/local_cluster.rs @@ -56,6 +56,7 @@ pub struct ClusterConfig { /// The fullnode config that should be applied to every node in the cluster pub fullnode_config: FullnodeConfig, /// Number of replicators in the cluster + /// Note- replicators will timeout if ticks_per_slot is much larger than the default 8 pub num_replicators: u64, /// Number of nodes that are unstaked and not voting (a.k.a listening) pub num_listeners: u64, @@ -449,6 +450,7 @@ impl Drop for LocalCluster { #[cfg(test)] mod test { use super::*; + use crate::storage_stage::STORAGE_ROTATE_TEST_COUNT; use solana_runtime::bank::MINIMUM_SLOT_LENGTH; #[test] @@ -465,6 +467,7 @@ mod test { solana_logger::setup(); let mut fullnode_config = FullnodeConfig::default(); fullnode_config.rpc_config.enable_fullnode_exit = true; + fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT; const NUM_NODES: usize = 1; let num_replicators = 1; let config = ClusterConfig { @@ -472,7 +475,7 @@ mod test { num_replicators: 1, node_stakes: vec![3; NUM_NODES], cluster_lamports: 100, - ticks_per_slot: 16, + ticks_per_slot: 8, slots_per_epoch: MINIMUM_SLOT_LENGTH as u64, ..ClusterConfig::default() }; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 03e44b436..8ebc2b48d 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -4,7 +4,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; use crate::blocktree_processor; use crate::cluster_info::ClusterInfo; -use crate::entry::{Entry, EntrySender, EntrySlice}; +use crate::entry::{Entry, EntrySlice}; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::leader_schedule_utils; use crate::locktower::{Locktower, StakeLockout}; @@ -83,12 +83,12 @@ impl ReplayStage { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, - storage_entry_sender: EntrySender, leader_schedule_cache: &Arc, - ) -> (Self, Receiver<(u64, Pubkey)>) + ) -> (Self, Receiver<(u64, Pubkey)>, Receiver) where T: 'static + KeypairUtil + Send + Sync, { + let (root_slot_sender, root_slot_receiver) = channel(); let (slot_full_sender, slot_full_receiver) = channel(); trace!("replay stage"); let exit_ = exit.clone(); @@ -132,7 +132,6 @@ impl ReplayStage { &my_id, &mut ticks_per_slot, &mut progress, - &storage_entry_sender, &slot_full_sender, )?; @@ -158,6 +157,7 @@ impl ReplayStage { &cluster_info, &blocktree, &leader_schedule_cache, + &root_slot_sender, )?; Self::reset_poh_recorder( @@ -213,7 +213,7 @@ impl ReplayStage { Ok(()) }) .unwrap(); - (Self { t_replay }, slot_full_receiver) + (Self { t_replay }, slot_full_receiver, root_slot_receiver) } pub fn start_leader( my_id: &Pubkey, @@ -283,12 +283,10 @@ impl ReplayStage { bank: &Bank, blocktree: &Blocktree, progress: &mut HashMap, - forward_entry_sender: &EntrySender, ) -> Result<()> { let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?; let len = entries.len(); - let result = - Self::replay_entries_into_bank(bank, entries, progress, forward_entry_sender, num); + let result = Self::replay_entries_into_bank(bank, entries, progress, num); if result.is_ok() { trace!("verified entries {}", len); inc_new_counter_info!("replicate-stage_process_entries", len); @@ -300,6 +298,7 @@ impl ReplayStage { Ok(()) } + #[allow(clippy::too_many_arguments)] fn handle_votable_bank( bank: &Arc, bank_forks: &Arc>, @@ -310,6 +309,7 @@ impl ReplayStage { cluster_info: &Arc>, blocktree: &Arc, leader_schedule_cache: &Arc, + root_slot_sender: &Sender, ) -> Result<()> where T: 'static + KeypairUtil + Send + Sync, @@ -319,6 +319,7 @@ impl ReplayStage { leader_schedule_cache.set_root(new_root); blocktree.set_root(new_root)?; Self::handle_new_root(&bank_forks, progress); + root_slot_sender.send(new_root)?; } locktower.update_epoch(&bank); if let Some(ref voting_keypair) = voting_keypair { @@ -365,7 +366,6 @@ impl ReplayStage { my_id: &Pubkey, ticks_per_slot: &mut u64, progress: &mut HashMap, - forward_entry_sender: &EntrySender, slot_full_sender: &Sender<(u64, Pubkey)>, ) -> Result<()> { let active_banks = bank_forks.read().unwrap().active_banks(); @@ -375,12 +375,7 @@ impl ReplayStage { let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); *ticks_per_slot = bank.ticks_per_slot(); if bank.collector_id() != *my_id { - Self::replay_blocktree_into_bank( - &bank, - &blocktree, - progress, - &forward_entry_sender, - )?; + Self::replay_blocktree_into_bank(&bank, &blocktree, progress)?; } let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1; if bank.tick_height() == max_tick_height { @@ -514,7 +509,6 @@ impl ReplayStage { bank: &Bank, entries: Vec, progress: &mut HashMap, - forward_entry_sender: &EntrySender, num: usize, ) -> Result<()> { let bank_progress = &mut progress @@ -525,9 +519,6 @@ impl ReplayStage { if let Some(last_entry) = entries.last() { bank_progress.last_entry = last_entry.hash; } - if result.is_ok() { - forward_entry_sender.send(entries)?; - } result } @@ -616,155 +607,14 @@ impl Service for ReplayStage { #[cfg(test)] mod test { use super::*; - use crate::banking_stage::create_test_recorder; - use crate::blocktree::{create_new_tmp_ledger, get_tmp_ledger_path}; - use crate::cluster_info::{ClusterInfo, Node}; - use crate::entry::create_ticks; - use crate::entry::{next_entry_mut, Entry}; - use crate::fullnode::new_banks_from_blocktree; + use crate::blocktree::get_tmp_ledger_path; use crate::packet::Blob; use crate::replay_stage::ReplayStage; - use crate::result::Error; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_vote_api::vote_state::Vote; use std::fs::remove_dir_all; - use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; - #[test] - fn test_vote_error_replay_stage_correctness() { - solana_logger::setup(); - // Set up dummy node to host a ReplayStage - let my_keypair = Keypair::new(); - let my_id = my_keypair.pubkey(); - let my_node = Node::new_localhost_with_pubkey(&my_id); - - // Create keypair for the leader - let leader_id = Pubkey::new_rand(); - - let (genesis_block, _mint_keypair) = GenesisBlock::new_with_leader(10_000, &leader_id, 500); - - let (my_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - - // Set up the cluster info - let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - my_node.info.clone(), - ))); - - // Set up the replay stage - { - let voting_keypair = Arc::new(Keypair::new()); - let (bank_forks, _bank_forks_info, blocktree, l_receiver, leader_schedule_cache) = - new_banks_from_blocktree(&my_ledger_path, None); - let bank = bank_forks.working_bank(); - let leader_schedule_cache = Arc::new(leader_schedule_cache); - let blocktree = Arc::new(blocktree); - let (exit, poh_recorder, poh_service, _entry_receiver) = - create_test_recorder(&bank, &blocktree); - let (ledger_writer_sender, ledger_writer_receiver) = channel(); - let (replay_stage, _slot_full_receiver) = ReplayStage::new( - &my_keypair.pubkey(), - &voting_keypair.pubkey(), - Some(voting_keypair.clone()), - blocktree.clone(), - &Arc::new(RwLock::new(bank_forks)), - cluster_info_me.clone(), - &exit, - l_receiver, - &Arc::new(RpcSubscriptions::default()), - &poh_recorder, - ledger_writer_sender, - &leader_schedule_cache, - ); - let vote_ix = vote_instruction::vote(&voting_keypair.pubkey(), vec![Vote::new(0)]); - let vote_tx = Transaction::new_signed_instructions( - &[voting_keypair.as_ref()], - vec![vote_ix], - bank.last_blockhash(), - ); - cluster_info_me.write().unwrap().push_vote(vote_tx); - - info!("Send ReplayStage an entry, should see it on the ledger writer receiver"); - let next_tick = create_ticks(1, bank.last_blockhash()); - blocktree - .write_entries(1, 0, 0, genesis_block.ticks_per_slot, next_tick.clone()) - .unwrap(); - - let received_tick = ledger_writer_receiver - .recv() - .expect("Expected to receive an entry on the ledger writer receiver"); - - assert_eq!(next_tick[0], received_tick[0]); - - exit.store(true, Ordering::Relaxed); - replay_stage.join().unwrap(); - poh_service.join().unwrap(); - } - let _ignored = remove_dir_all(&my_ledger_path); - } - - #[test] - fn test_replay_stage_poh_ok_entry_receiver() { - let (forward_entry_sender, forward_entry_receiver) = channel(); - let genesis_block = GenesisBlock::new(10_000).0; - let bank = Arc::new(Bank::new(&genesis_block)); - let mut blockhash = bank.last_blockhash(); - let mut entries = Vec::new(); - for _ in 0..5 { - let entry = next_entry_mut(&mut blockhash, 1, vec![]); //just ticks - entries.push(entry); - } - - let mut progress = HashMap::new(); - let res = ReplayStage::replay_entries_into_bank( - &bank, - entries.clone(), - &mut progress, - &forward_entry_sender, - 0, - ); - assert!(res.is_ok(), "replay failed {:?}", res); - let res = forward_entry_receiver.try_recv(); - match res { - Ok(_) => (), - Err(e) => assert!(false, "Entries were not sent correctly {:?}", e), - } - } - - #[test] - fn test_replay_stage_poh_error_entry_receiver() { - let (forward_entry_sender, forward_entry_receiver) = channel(); - let mut entries = Vec::new(); - for _ in 0..5 { - let entry = Entry::new(&mut Hash::default(), 1, vec![]); //just broken entries - entries.push(entry); - } - - let genesis_block = GenesisBlock::new(10_000).0; - let bank = Arc::new(Bank::new(&genesis_block)); - let mut progress = HashMap::new(); - let res = ReplayStage::replay_entries_into_bank( - &bank, - entries.clone(), - &mut progress, - &forward_entry_sender, - 0, - ); - - match res { - Ok(_) => assert!(false, "Should have failed because entries are broken"), - Err(Error::BlobError(BlobError::VerificationFailed)) => (), - Err(e) => assert!( - false, - "Should have failed because with blob error, instead, got {:?}", - e - ), - } - assert!(forward_entry_receiver.try_recv().is_err()); - } - #[test] fn test_child_slots_of_same_parent() { let ledger_path = get_tmp_ledger_path!(); diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 248e84d4e..04936417a 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -9,7 +9,7 @@ use crate::packet::to_shared_blob; use crate::repair_service::RepairSlotRange; use crate::result::Result; use crate::service::Service; -use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; +use crate::storage_stage::SLOTS_PER_SEGMENT; use crate::streamer::receiver; use crate::streamer::responder; use crate::window_service::WindowService; @@ -26,7 +26,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_transaction; use solana_sdk::transaction::Transaction; use solana_sdk::transport::TransportError; -use solana_storage_api::storage_instruction; +use solana_storage_api::{get_segment_from_slot, storage_instruction}; use std::fs::File; use std::io; use std::io::BufReader; @@ -107,18 +107,15 @@ pub fn sample_file(in_path: &Path, sample_offsets: &[u64]) -> io::Result { Ok(hasher.result()) } -fn get_entry_heights_from_blockhash( - signature: &ed25519_dalek::Signature, - storage_entry_height: u64, -) -> u64 { +fn get_slot_from_blockhash(signature: &ed25519_dalek::Signature, storage_slot: u64) -> u64 { let signature_vec = signature.to_bytes(); let mut segment_index = u64::from(signature_vec[0]) | (u64::from(signature_vec[1]) << 8) | (u64::from(signature_vec[1]) << 16) | (u64::from(signature_vec[2]) << 24); - let max_segment_index = get_segment_from_entry(storage_entry_height); + let max_segment_index = get_segment_from_slot(storage_slot); segment_index %= max_segment_index as u64; - segment_index * ENTRIES_PER_SEGMENT + segment_index * SLOTS_PER_SEGMENT } fn create_request_processor( @@ -213,16 +210,15 @@ impl Replicator { info!("Looking for leader at {:?}", cluster_entrypoint); crate::gossip_service::discover_nodes(&cluster_entrypoint.gossip, 1)?; - let (storage_blockhash, storage_entry_height) = - Self::poll_for_blockhash_and_entry_height(&cluster_info)?; + let (storage_blockhash, storage_slot) = Self::poll_for_blockhash_and_slot(&cluster_info)?; let node_info = node.info.clone(); let signature = storage_keypair.sign(storage_blockhash.as_ref()); - let slot = get_entry_heights_from_blockhash(&signature, storage_entry_height); + let slot = get_slot_from_blockhash(&signature, storage_slot); info!("replicating slot: {}", slot); let mut repair_slot_range = RepairSlotRange::default(); - repair_slot_range.end = slot + ENTRIES_PER_SEGMENT; + repair_slot_range.end = slot + SLOTS_PER_SEGMENT; repair_slot_range.start = slot; let repair_socket = Arc::new(node.sockets.repair); @@ -324,7 +320,7 @@ impl Replicator { if meta.is_connected { current_slot += 1; warn!("current slot: {}", current_slot); - if current_slot >= start_slot + ENTRIES_PER_SEGMENT { + if current_slot >= start_slot + SLOTS_PER_SEGMENT { break 'outer; } } else { @@ -481,11 +477,11 @@ impl Replicator { } } - pub fn entry_height(&self) -> u64 { + pub fn slot(&self) -> u64 { self.slot } - fn poll_for_blockhash_and_entry_height( + fn poll_for_blockhash_and_slot( cluster_info: &Arc>, ) -> Result<(String, u64)> { for _ in 0..10 { @@ -500,20 +496,20 @@ impl Replicator { .retry_make_rpc_request(&RpcRequest::GetStorageBlockhash, None, 0) .expect("rpc request") .to_string(); - let storage_entry_height = rpc_client - .retry_make_rpc_request(&RpcRequest::GetStorageEntryHeight, None, 0) + let storage_slot = rpc_client + .retry_make_rpc_request(&RpcRequest::GetStorageSlot, None, 0) .expect("rpc request") .as_u64() .unwrap(); - info!("max entry_height: {}", storage_entry_height); - if get_segment_from_entry(storage_entry_height) != 0 { - return Ok((storage_blockhash, storage_entry_height)); + info!("max slot: {}", storage_slot); + if get_segment_from_slot(storage_slot) != 0 { + return Ok((storage_blockhash, storage_slot)); } sleep(Duration::from_secs(3)); } Err(Error::new( ErrorKind::Other, - "Couldn't get blockhash or entry_height", + "Couldn't get blockhash or slot", ))? } } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 237cf817a..9be9988a5 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -105,15 +105,13 @@ impl JsonRpcRequestProcessor { Ok(bs58::encode(hash).into_string()) } - fn get_storage_entry_height(&self) -> Result { - let entry_height = self.storage_state.get_entry_height(); - Ok(entry_height) + fn get_storage_slot(&self) -> Result { + let slot = self.storage_state.get_slot(); + Ok(slot) } - fn get_storage_pubkeys_for_entry_height(&self, entry_height: u64) -> Result> { - Ok(self - .storage_state - .get_pubkeys_for_entry_height(entry_height)) + fn get_storage_pubkeys_for_slot(&self, slot: u64) -> Result> { + Ok(self.storage_state.get_pubkeys_for_slot(slot)) } pub fn fullnode_exit(&self) -> Result { @@ -225,15 +223,11 @@ pub trait RpcSol { #[rpc(meta, name = "getStorageBlockhash")] fn get_storage_blockhash(&self, _: Self::Metadata) -> Result; - #[rpc(meta, name = "getStorageEntryHeight")] - fn get_storage_entry_height(&self, _: Self::Metadata) -> Result; + #[rpc(meta, name = "getStorageSlot")] + fn get_storage_slot(&self, _: Self::Metadata) -> Result; - #[rpc(meta, name = "getStoragePubkeysForEntryHeight")] - fn get_storage_pubkeys_for_entry_height( - &self, - _: Self::Metadata, - _: u64, - ) -> Result>; + #[rpc(meta, name = "getStoragePubkeysForSlot")] + fn get_storage_pubkeys_for_slot(&self, _: Self::Metadata, _: u64) -> Result>; #[rpc(meta, name = "fullnodeExit")] fn fullnode_exit(&self, _: Self::Metadata) -> Result; @@ -464,22 +458,15 @@ impl RpcSol for RpcSolImpl { .get_storage_blockhash() } - fn get_storage_entry_height(&self, meta: Self::Metadata) -> Result { - meta.request_processor - .read() - .unwrap() - .get_storage_entry_height() + fn get_storage_slot(&self, meta: Self::Metadata) -> Result { + meta.request_processor.read().unwrap().get_storage_slot() } - fn get_storage_pubkeys_for_entry_height( - &self, - meta: Self::Metadata, - entry_height: u64, - ) -> Result> { + fn get_storage_pubkeys_for_slot(&self, meta: Self::Metadata, slot: u64) -> Result> { meta.request_processor .read() .unwrap() - .get_storage_pubkeys_for_entry_height(entry_height) + .get_storage_pubkeys_for_slot(slot) } fn fullnode_exit(&self, meta: Self::Metadata) -> Result { diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index de5d6e07c..fff819811 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -7,7 +7,6 @@ use crate::blocktree::Blocktree; #[cfg(all(feature = "chacha", feature = "cuda"))] use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use crate::cluster_info::ClusterInfo; -use crate::entry::{Entry, EntryReceiver}; use crate::result::{Error, Result}; use crate::service::Service; use bincode::deserialize; @@ -19,13 +18,14 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_instruction; use solana_sdk::transaction::Transaction; -use solana_storage_api::storage_instruction::{self, StorageInstruction}; +use solana_storage_api::storage_instruction::StorageInstruction; +use solana_storage_api::{get_segment_from_slot, storage_instruction}; use std::collections::HashSet; use std::io; use std::mem::size_of; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, RecvTimeoutError, Sender}; +use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; @@ -42,7 +42,7 @@ pub struct StorageStateInner { storage_keys: StorageKeys, replicator_map: ReplicatorMap, storage_blockhash: Hash, - entry_height: u64, + slot: u64, } #[derive(Clone, Default)] @@ -55,25 +55,15 @@ pub struct StorageStage { t_storage_create_accounts: JoinHandle<()>, } -macro_rules! cross_boundary { - ($start:expr, $len:expr, $boundary:expr) => { - (($start + $len) & !($boundary - 1)) > $start & !($boundary - 1) - }; -} - -pub const STORAGE_ROTATE_TEST_COUNT: u64 = 128; +pub const STORAGE_ROTATE_TEST_COUNT: u64 = 2; // TODO: some way to dynamically size NUM_IDENTITIES const NUM_IDENTITIES: usize = 1024; pub const NUM_STORAGE_SAMPLES: usize = 4; -pub const ENTRIES_PER_SEGMENT: u64 = 16; +pub const SLOTS_PER_SEGMENT: u64 = 16; const KEY_SIZE: usize = 64; type InstructionSender = Sender; -pub fn get_segment_from_entry(entry_height: u64) -> u64 { - entry_height / ENTRIES_PER_SEGMENT -} - fn get_identity_index_from_signature(key: &Signature) -> usize { let rkey = key.as_ref(); let mut res: usize = (rkey[0] as usize) @@ -94,7 +84,7 @@ impl StorageState { storage_keys, storage_results, replicator_map, - entry_height: 0, + slot: 0, storage_blockhash: Hash::default(), }; @@ -117,14 +107,14 @@ impl StorageState { self.state.read().unwrap().storage_blockhash } - pub fn get_entry_height(&self) -> u64 { - self.state.read().unwrap().entry_height + pub fn get_slot(&self) -> u64 { + self.state.read().unwrap().slot } - pub fn get_pubkeys_for_entry_height(&self, entry_height: u64) -> Vec { + pub fn get_pubkeys_for_slot(&self, slot: u64) -> Vec { // TODO: keep track of age? const MAX_PUBKEYS_TO_RETURN: usize = 5; - let index = (entry_height / ENTRIES_PER_SEGMENT) as usize; + let index = get_segment_from_slot(slot) as usize; let replicator_map = &self.state.read().unwrap().replicator_map; if index < replicator_map.len() { replicator_map[index] @@ -142,18 +132,15 @@ impl StorageStage { #[allow(clippy::too_many_arguments)] pub fn new( storage_state: &StorageState, - storage_entry_receiver: EntryReceiver, + slot_receiver: Receiver, blocktree: Option>, keypair: &Arc, storage_keypair: &Arc, exit: &Arc, - entry_height: u64, bank_forks: &Arc>, storage_rotate_count: u64, cluster_info: &Arc>, ) -> Self { - debug!("storage_stage::new: entry_height: {}", entry_height); - storage_state.state.write().unwrap().entry_height = entry_height; let storage_state_inner = storage_state.state.clone(); let exit0 = exit.clone(); let keypair0 = storage_keypair.clone(); @@ -163,18 +150,16 @@ impl StorageStage { let t_storage_mining_verifier = Builder::new() .name("solana-storage-mining-verify-stage".to_string()) .spawn(move || { - let mut poh_height = 0; let mut current_key = 0; - let mut entry_height = entry_height; + let mut slot_count = 0; loop { if let Some(ref some_blocktree) = blocktree { if let Err(e) = Self::process_entries( &keypair0, &storage_state_inner, - &storage_entry_receiver, + &slot_receiver, &some_blocktree, - &mut poh_height, - &mut entry_height, + &mut slot_count, &mut current_key, storage_rotate_count, &instruction_sender, @@ -283,27 +268,23 @@ impl StorageStage { keypair: &Arc, _blocktree: &Arc, entry_id: Hash, - entry_height: u64, + slot: u64, instruction_sender: &InstructionSender, ) -> Result<()> { let mut seed = [0u8; 32]; let signature = keypair.sign(&entry_id.as_ref()); - let ix = storage_instruction::advertise_recent_blockhash( - &keypair.pubkey(), - entry_id, - entry_height, - ); + let ix = storage_instruction::advertise_recent_blockhash(&keypair.pubkey(), entry_id, slot); instruction_sender.send(ix)?; seed.copy_from_slice(&signature.to_bytes()[..32]); let mut rng = ChaChaRng::from_seed(seed); - state.write().unwrap().entry_height = entry_height; + state.write().unwrap().slot = slot; // Regenerate the answers - let num_segments = (entry_height / ENTRIES_PER_SEGMENT) as usize; + let num_segments = get_segment_from_slot(slot) as usize; if num_segments == 0 { info!("Ledger has 0 segments!"); return Ok(()); @@ -354,91 +335,99 @@ impl StorageStage { Ok(()) } + fn process_storage_transaction( + data: &[u8], + slot: u64, + storage_state: &Arc>, + current_key_idx: &mut usize, + transaction_key0: Pubkey, + ) { + match deserialize(data) { + Ok(StorageInstruction::SubmitMiningProof { + slot: proof_slot, + signature, + .. + }) => { + if proof_slot < slot { + { + debug!( + "generating storage_keys from storage txs current_key_idx: {}", + *current_key_idx + ); + let storage_keys = &mut storage_state.write().unwrap().storage_keys; + storage_keys[*current_key_idx..*current_key_idx + size_of::()] + .copy_from_slice(signature.as_ref()); + *current_key_idx += size_of::(); + *current_key_idx %= storage_keys.len(); + } + + let mut statew = storage_state.write().unwrap(); + let max_segment_index = get_segment_from_slot(slot) as usize; + if statew.replicator_map.len() <= max_segment_index { + statew + .replicator_map + .resize(max_segment_index, HashSet::new()); + } + let proof_segment_index = get_segment_from_slot(proof_slot) as usize; + if proof_segment_index < statew.replicator_map.len() { + statew.replicator_map[proof_segment_index].insert(transaction_key0); + } + } + debug!("storage proof: slot: {}", slot); + } + Ok(_) => {} + Err(e) => { + info!("error: {:?}", e); + } + } + } + fn process_entries( keypair: &Arc, storage_state: &Arc>, - entry_receiver: &EntryReceiver, + slot_receiver: &Receiver, blocktree: &Arc, - poh_height: &mut u64, - entry_height: &mut u64, + slot_count: &mut u64, current_key_idx: &mut usize, storage_rotate_count: u64, instruction_sender: &InstructionSender, ) -> Result<()> { let timeout = Duration::new(1, 0); - let entries: Vec = entry_receiver.recv_timeout(timeout)?; - for entry in entries { - // Go through the transactions, find proofs, and use them to update - // the storage_keys with their signatures - for tx in entry.transactions { - let message = tx.message(); - for instruction in &message.instructions { - let program_id = instruction.program_id(message.program_ids()); - if solana_storage_api::check_id(program_id) { - match deserialize(&instruction.data) { - Ok(StorageInstruction::SubmitMiningProof { - entry_height: proof_entry_height, - signature, - .. - }) => { - if proof_entry_height < *entry_height { - { - debug!( - "generating storage_keys from storage txs current_key_idx: {}", - *current_key_idx - ); - let storage_keys = - &mut storage_state.write().unwrap().storage_keys; - storage_keys[*current_key_idx - ..*current_key_idx + size_of::()] - .copy_from_slice(signature.as_ref()); - *current_key_idx += size_of::(); - *current_key_idx %= storage_keys.len(); - } - - let mut statew = storage_state.write().unwrap(); - let max_segment_index = - (*entry_height / ENTRIES_PER_SEGMENT) as usize; - if statew.replicator_map.len() <= max_segment_index { - statew - .replicator_map - .resize(max_segment_index, HashSet::new()); - } - let proof_segment_index = - (proof_entry_height / ENTRIES_PER_SEGMENT) as usize; - if proof_segment_index < statew.replicator_map.len() { - statew.replicator_map[proof_segment_index] - .insert(message.account_keys[0]); - } - } - debug!("storage proof: entry_height: {}", entry_height); - } - Ok(_) => {} - Err(e) => { - info!("error: {:?}", e); - } + let slot: u64 = slot_receiver.recv_timeout(timeout)?; + storage_state.write().unwrap().slot = slot; + *slot_count += 1; + if let Ok(entries) = blocktree.get_slot_entries(slot, 0, None) { + for entry in entries { + // Go through the transactions, find proofs, and use them to update + // the storage_keys with their signatures + for tx in entry.transactions { + for (i, program_id) in tx.message.program_ids().iter().enumerate() { + if solana_storage_api::check_id(&program_id) { + Self::process_storage_transaction( + &tx.message().instructions[i].data, + slot, + storage_state, + current_key_idx, + tx.message.account_keys[0], + ); } } } + if *slot_count % storage_rotate_count == 0 { + debug!( + "crosses sending at slot: {}! hashes: {}", + slot, entry.num_hashes + ); + Self::process_entry_crossing( + &storage_state, + &keypair, + &blocktree, + entry.hash, + slot, + instruction_sender, + )?; + } } - if cross_boundary!(*poh_height, entry.num_hashes, storage_rotate_count) { - trace!( - "crosses sending at poh_height: {} entry_height: {}! hashes: {}", - *poh_height, - entry_height, - entry.num_hashes - ); - Self::process_entry_crossing( - &storage_state, - &keypair, - &blocktree, - entry.hash, - *entry_height, - instruction_sender, - )?; - } - *entry_height += 1; - *poh_height += entry.num_hashes; } Ok(()) } @@ -485,16 +474,15 @@ mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(1000); let bank = Arc::new(Bank::new(&genesis_block)); let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank]))); - let (_storage_entry_sender, storage_entry_receiver) = channel(); + let (_slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, - storage_entry_receiver, + slot_receiver, None, &keypair, &storage_keypair, &exit.clone(), - 0, &bank_forks, STORAGE_ROTATE_TEST_COUNT, &cluster_info, @@ -521,30 +509,30 @@ mod tests { let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let entries = make_tiny_test_entries(64); - let blocktree = Blocktree::open(&ledger_path).unwrap(); + let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); + let slot = 1; let bank = Arc::new(Bank::new(&genesis_block)); let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank]))); blocktree - .write_entries(1, 0, 0, ticks_per_slot, &entries) + .write_entries(slot, 0, 0, ticks_per_slot, &entries) .unwrap(); let cluster_info = test_cluster_info(&keypair.pubkey()); - let (storage_entry_sender, storage_entry_receiver) = channel(); + let (slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, - storage_entry_receiver, - Some(Arc::new(blocktree)), + slot_receiver, + Some(blocktree.clone()), &keypair, &storage_keypair, &exit.clone(), - 0, &bank_forks, STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - storage_entry_sender.send(entries.clone()).unwrap(); + slot_sender.send(slot).unwrap(); let keypair = Keypair::new(); let hash = Hash::default(); @@ -552,8 +540,12 @@ mod tests { let mut result = storage_state.get_mining_result(&signature); assert_eq!(result, Hash::default()); - for _ in 0..9 { - storage_entry_sender.send(entries.clone()).unwrap(); + for i in slot..slot + 3 { + blocktree + .write_entries(i, 0, 0, ticks_per_slot, &entries) + .unwrap(); + + slot_sender.send(i).unwrap(); } for _ in 0..5 { result = storage_state.get_mining_result(&signature); @@ -590,7 +582,7 @@ mod tests { let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); let entries = make_tiny_test_entries(128); - let blocktree = Blocktree::open(&ledger_path).unwrap(); + let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); blocktree .write_entries(1, 0, 0, ticks_per_slot, &entries) .unwrap(); @@ -598,21 +590,20 @@ mod tests { let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(&[bank]))); let cluster_info = test_cluster_info(&keypair.pubkey()); - let (storage_entry_sender, storage_entry_receiver) = channel(); + let (slot_sender, slot_receiver) = channel(); let storage_state = StorageState::new(); let storage_stage = StorageStage::new( &storage_state, - storage_entry_receiver, - Some(Arc::new(blocktree)), + slot_receiver, + Some(blocktree.clone()), &keypair, &storage_keypair, &exit.clone(), - 0, &bank_forks, STORAGE_ROTATE_TEST_COUNT, &cluster_info, ); - storage_entry_sender.send(entries.clone()).unwrap(); + slot_sender.send(1).unwrap(); let mut reference_keys; { @@ -632,7 +623,10 @@ mod tests { let mining_txs = vec![mining_proof_tx]; let proof_entries = vec![Entry::new(&Hash::default(), 1, mining_txs)]; - storage_entry_sender.send(proof_entries).unwrap(); + blocktree + .write_entries(2, 0, 0, ticks_per_slot, &proof_entries) + .unwrap(); + slot_sender.send(2).unwrap(); for _ in 0..5 { { diff --git a/core/src/tpu.rs b/core/src/tpu.rs index ea4259b5b..67c76e7a9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -6,7 +6,6 @@ use crate::blocktree::Blocktree; use crate::broadcast_stage::BroadcastStage; use crate::cluster_info::ClusterInfo; use crate::cluster_info_vote_listener::ClusterInfoVoteListener; -use crate::entry::EntrySender; use crate::fetch_stage::FetchStage; use crate::poh_recorder::{PohRecorder, WorkingBankEntries}; use crate::service::Service; @@ -39,7 +38,6 @@ impl Tpu { broadcast_socket: UdpSocket, sigverify_disabled: bool, blocktree: &Arc, - storage_entry_sender: EntrySender, exit: &Arc, genesis_blockhash: &Hash, ) -> Self { @@ -80,7 +78,6 @@ impl Tpu { entry_receiver, &exit, blocktree, - storage_entry_sender, genesis_blockhash, ); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d1b11f1a4..7fa5d3c98 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -16,9 +16,7 @@ use crate::bank_forks::BankForks; use crate::blob_fetch_stage::BlobFetchStage; use crate::blockstream_service::BlockstreamService; use crate::blocktree::Blocktree; -use crate::blocktree_processor::BankForksInfo; use crate::cluster_info::ClusterInfo; -use crate::entry::{EntryReceiver, EntrySender}; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::poh_recorder::PohRecorder; use crate::replay_stage::ReplayStage; @@ -61,7 +59,6 @@ impl Tvu { vote_account: &Pubkey, voting_keypair: Option>, bank_forks: &Arc>, - bank_forks_info: &[BankForksInfo], cluster_info: &Arc>, sockets: Sockets, blocktree: Arc, @@ -71,8 +68,6 @@ impl Tvu { ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, - storage_entry_sender: EntrySender, - storage_entry_receiver: EntryReceiver, leader_schedule_cache: &Arc, exit: &Arc, genesis_blockhash: &Hash, @@ -115,7 +110,7 @@ impl Tvu { genesis_blockhash, ); - let (replay_stage, slot_full_receiver) = ReplayStage::new( + let (replay_stage, slot_full_receiver, root_slot_receiver) = ReplayStage::new( &keypair.pubkey(), vote_account, voting_keypair, @@ -126,7 +121,6 @@ impl Tvu { ledger_signal_receiver, subscriptions, poh_recorder, - storage_entry_sender, leader_schedule_cache, ); @@ -145,12 +139,11 @@ impl Tvu { let storage_keypair = Arc::new(Keypair::new()); let storage_stage = StorageStage::new( storage_state, - storage_entry_receiver, + root_slot_receiver, Some(blocktree), &keypair, &storage_keypair, &exit, - bank_forks_info[0].entry_height, // TODO: StorageStage needs to deal with BankForks somehow still &bank_forks, storage_rotate_count, &cluster_info, @@ -203,10 +196,6 @@ pub mod tests { let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance); let bank_forks = BankForks::new(0, Bank::new(&genesis_block)); - let bank_forks_info = vec![BankForksInfo { - bank_slot: 0, - entry_height: 0, - }]; //start cluster_info1 let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); @@ -221,13 +210,11 @@ pub mod tests { let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blocktree); let voting_keypair = Keypair::new(); - let (storage_entry_sender, storage_entry_receiver) = channel(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), &Arc::new(RwLock::new(bank_forks)), - &bank_forks_info, &cref1, { Sockets { @@ -243,8 +230,6 @@ pub mod tests { l_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, - storage_entry_sender, - storage_entry_receiver, &leader_schedule_cache, &exit, &Hash::default(), diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index b125aa7ba..c5ecb247a 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -84,7 +84,7 @@ fn test_replay() { let tvu_addr = target1.info.tvu; - let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = + let (bank_forks, _bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = fullnode::new_banks_from_blocktree(&blocktree_path, None); let bank = bank_forks.working_bank(); assert_eq!( @@ -105,12 +105,10 @@ fn test_replay() { { let (poh_service_exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blocktree); - let (storage_sender, storage_receiver) = channel(); let tvu = Tvu::new( &voting_keypair.pubkey(), Some(Arc::new(voting_keypair)), &bank_forks, - &bank_forks_info, &cref1, { Sockets { @@ -126,8 +124,6 @@ fn test_replay() { ledger_signal_receiver, &Arc::new(RpcSubscriptions::default()), &poh_recorder, - storage_sender, - storage_receiver, &leader_schedule_cache, &exit, &solana_sdk::hash::Hash::default(), diff --git a/programs/storage_api/src/lib.rs b/programs/storage_api/src/lib.rs index e5c692529..4f292e28a 100644 --- a/programs/storage_api/src/lib.rs +++ b/programs/storage_api/src/lib.rs @@ -4,10 +4,10 @@ pub mod storage_processor; use solana_sdk::pubkey::Pubkey; -pub const ENTRIES_PER_SEGMENT: u64 = 16; +pub const SLOTS_PER_SEGMENT: u64 = 2; -pub fn get_segment_from_entry(entry_height: u64) -> usize { - (entry_height / ENTRIES_PER_SEGMENT) as usize +pub fn get_segment_from_slot(slot: u64) -> usize { + (slot / SLOTS_PER_SEGMENT) as usize } const STORAGE_PROGRAM_ID: [u8; 32] = [ diff --git a/programs/storage_api/src/storage_contract.rs b/programs/storage_api/src/storage_contract.rs index 7353408e9..83c89829f 100644 --- a/programs/storage_api/src/storage_contract.rs +++ b/programs/storage_api/src/storage_contract.rs @@ -1,4 +1,4 @@ -use crate::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; +use crate::get_segment_from_slot; use log::*; use serde_derive::{Deserialize, Serialize}; use solana_sdk::account::Account; @@ -44,7 +44,7 @@ pub enum StorageContract { Default, ValidatorStorage { - entry_height: u64, + slot: u64, hash: Hash, lockout_validations: Vec>, reward_validations: Vec>, @@ -68,7 +68,7 @@ impl<'a> StorageAccount<'a> { &mut self, id: Pubkey, sha_state: Hash, - entry_height: u64, + slot: u64, signature: Signature, ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; @@ -80,7 +80,7 @@ impl<'a> StorageAccount<'a> { }; if let StorageContract::ReplicatorStorage { proofs, .. } = &mut storage_contract { - let segment_index = get_segment_from_entry(entry_height); + let segment_index = get_segment_from_slot(slot); if segment_index > proofs.len() || proofs.is_empty() { proofs.resize(cmp::max(1, segment_index), Proof::default()); } @@ -91,8 +91,8 @@ impl<'a> StorageAccount<'a> { } debug!( - "Mining proof submitted with contract {:?} entry_height: {}", - sha_state, entry_height + "Mining proof submitted with contract {:?} slot: {}", + sha_state, slot ); let proof_info = Proof { @@ -110,12 +110,12 @@ impl<'a> StorageAccount<'a> { pub fn advertise_storage_recent_blockhash( &mut self, hash: Hash, - entry_height: u64, + slot: u64, ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; if let StorageContract::Default = storage_contract { *storage_contract = StorageContract::ValidatorStorage { - entry_height: 0, + slot: 0, hash: Hash::default(), lockout_validations: vec![], reward_validations: vec![], @@ -123,14 +123,14 @@ impl<'a> StorageAccount<'a> { }; if let StorageContract::ValidatorStorage { - entry_height: state_entry_height, + slot: state_slot, hash: state_hash, reward_validations, lockout_validations, } = &mut storage_contract { - let original_segments = *state_entry_height / ENTRIES_PER_SEGMENT; - let segments = entry_height / ENTRIES_PER_SEGMENT; + let original_segments = get_segment_from_slot(*state_slot); + let segments = get_segment_from_slot(slot); debug!( "advertise new last id segments: {} orig: {}", segments, original_segments @@ -139,7 +139,7 @@ impl<'a> StorageAccount<'a> { return Err(InstructionError::InvalidArgument); } - *state_entry_height = entry_height; + *state_slot = slot; *state_hash = hash; // move lockout_validations to reward_validations @@ -154,14 +154,14 @@ impl<'a> StorageAccount<'a> { pub fn proof_validation( &mut self, - entry_height: u64, + slot: u64, proofs: Vec, replicator_accounts: &mut [StorageAccount], ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; if let StorageContract::Default = storage_contract { *storage_contract = StorageContract::ValidatorStorage { - entry_height: 0, + slot: 0, hash: Hash::default(), lockout_validations: vec![], reward_validations: vec![], @@ -169,16 +169,16 @@ impl<'a> StorageAccount<'a> { }; if let StorageContract::ValidatorStorage { - entry_height: current_entry_height, + slot: current_slot, lockout_validations, .. } = &mut storage_contract { - if entry_height >= *current_entry_height { + if slot >= *current_slot { return Err(InstructionError::InvalidArgument); } - let segment_index = get_segment_from_entry(entry_height); + let segment_index = get_segment_from_slot(slot); let mut previous_proofs = replicator_accounts .iter_mut() .filter_map(|account| { @@ -224,7 +224,7 @@ impl<'a> StorageAccount<'a> { pub fn claim_storage_reward( &mut self, - entry_height: u64, + slot: u64, tick_height: u64, ) -> Result<(), InstructionError> { let mut storage_contract = &mut self.account.state()?; @@ -236,7 +236,7 @@ impl<'a> StorageAccount<'a> { reward_validations, .. } = &mut storage_contract { - let claims_index = get_segment_from_entry(entry_height); + let claims_index = get_segment_from_slot(slot); let _num_validations = count_valid_proofs(&reward_validations[claims_index]); // TODO can't just create lamports out of thin air // self.account.lamports += TOTAL_VALIDATOR_REWARDS * num_validations; @@ -248,8 +248,8 @@ impl<'a> StorageAccount<'a> { { // if current tick height is a full segment away? then allow reward collection // storage needs to move to tick heights too, until then this makes little sense - let current_index = get_segment_from_entry(tick_height); - let claims_index = get_segment_from_entry(entry_height); + let current_index = get_segment_from_slot(tick_height); + let claims_index = get_segment_from_slot(slot); if current_index <= claims_index || claims_index >= reward_validations.len() { debug!( "current {:?}, claim {:?}, rewards {:?}", @@ -348,7 +348,7 @@ mod tests { } contract = StorageContract::ValidatorStorage { - entry_height: 0, + slot: 0, hash: Hash::default(), lockout_validations: vec![], reward_validations: vec![], diff --git a/programs/storage_api/src/storage_instruction.rs b/programs/storage_api/src/storage_instruction.rs index 7fd22b190..3e5b00329 100644 --- a/programs/storage_api/src/storage_instruction.rs +++ b/programs/storage_api/src/storage_instruction.rs @@ -11,18 +11,18 @@ use solana_sdk::signature::Signature; pub enum StorageInstruction { SubmitMiningProof { sha_state: Hash, - entry_height: u64, + slot: u64, signature: Signature, }, AdvertiseStorageRecentBlockhash { hash: Hash, - entry_height: u64, + slot: u64, }, ClaimStorageReward { - entry_height: u64, + slot: u64, }, ProofValidation { - entry_height: u64, + slot: u64, proofs: Vec, }, } @@ -30,12 +30,12 @@ pub enum StorageInstruction { pub fn mining_proof( from_pubkey: &Pubkey, sha_state: Hash, - entry_height: u64, + slot: u64, signature: Signature, ) -> Instruction { let storage_instruction = StorageInstruction::SubmitMiningProof { sha_state, - entry_height, + slot, signature, }; let account_metas = vec![AccountMeta::new(*from_pubkey, true)]; @@ -45,34 +45,27 @@ pub fn mining_proof( pub fn advertise_recent_blockhash( from_pubkey: &Pubkey, storage_hash: Hash, - entry_height: u64, + slot: u64, ) -> Instruction { let storage_instruction = StorageInstruction::AdvertiseStorageRecentBlockhash { hash: storage_hash, - entry_height, + slot, }; let account_metas = vec![AccountMeta::new(*from_pubkey, true)]; Instruction::new(id(), &storage_instruction, account_metas) } -pub fn proof_validation( - from_pubkey: &Pubkey, - entry_height: u64, - proofs: Vec, -) -> Instruction { +pub fn proof_validation(from_pubkey: &Pubkey, slot: u64, proofs: Vec) -> Instruction { let mut account_metas = vec![AccountMeta::new(*from_pubkey, true)]; proofs.iter().for_each(|checked_proof| { account_metas.push(AccountMeta::new(checked_proof.proof.id, false)) }); - let storage_instruction = StorageInstruction::ProofValidation { - entry_height, - proofs, - }; + let storage_instruction = StorageInstruction::ProofValidation { slot, proofs }; Instruction::new(id(), &storage_instruction, account_metas) } -pub fn reward_claim(from_pubkey: &Pubkey, entry_height: u64) -> Instruction { - let storage_instruction = StorageInstruction::ClaimStorageReward { entry_height }; +pub fn reward_claim(from_pubkey: &Pubkey, slot: u64) -> Instruction { + let storage_instruction = StorageInstruction::ClaimStorageReward { slot }; let account_metas = vec![AccountMeta::new(*from_pubkey, true)]; Instruction::new(id(), &storage_instruction, account_metas) } diff --git a/programs/storage_api/src/storage_processor.rs b/programs/storage_api/src/storage_processor.rs index 18656026f..695d1ffed 100644 --- a/programs/storage_api/src/storage_processor.rs +++ b/programs/storage_api/src/storage_processor.rs @@ -37,44 +37,36 @@ pub fn process_instruction( match bincode::deserialize(data).map_err(|_| InstructionError::InvalidInstructionData)? { StorageInstruction::SubmitMiningProof { sha_state, - entry_height, + slot, signature, } => { if num_keyed_accounts != 1 { Err(InstructionError::InvalidArgument)?; } - storage_account.submit_mining_proof( - storage_account_pubkey, - sha_state, - entry_height, - signature, - ) + storage_account.submit_mining_proof(storage_account_pubkey, sha_state, slot, signature) } - StorageInstruction::AdvertiseStorageRecentBlockhash { hash, entry_height } => { + StorageInstruction::AdvertiseStorageRecentBlockhash { hash, slot } => { if num_keyed_accounts != 1 { // keyed_accounts[0] should be the main storage key // to access its data Err(InstructionError::InvalidArgument)?; } - storage_account.advertise_storage_recent_blockhash(hash, entry_height) + storage_account.advertise_storage_recent_blockhash(hash, slot) } - StorageInstruction::ClaimStorageReward { entry_height } => { + StorageInstruction::ClaimStorageReward { slot } => { if num_keyed_accounts != 1 { // keyed_accounts[0] should be the main storage key // to access its data Err(InstructionError::InvalidArgument)?; } - storage_account.claim_storage_reward(entry_height, tick_height) + storage_account.claim_storage_reward(slot, tick_height) } - StorageInstruction::ProofValidation { - entry_height, - proofs, - } => { + StorageInstruction::ProofValidation { slot, proofs } => { if num_keyed_accounts == 1 { // have to have at least 1 replicator to do any verification Err(InstructionError::InvalidArgument)?; } - storage_account.proof_validation(entry_height, proofs, &mut rest) + storage_account.proof_validation(slot, proofs, &mut rest) } } } @@ -85,7 +77,7 @@ mod tests { use crate::id; use crate::storage_contract::{CheckedProof, Proof, ProofStatus, StorageContract}; use crate::storage_instruction; - use crate::ENTRIES_PER_SEGMENT; + use crate::SLOTS_PER_SEGMENT; use bincode::deserialize; use solana_runtime::bank::Bank; use solana_runtime::bank_client::BankClient; @@ -134,7 +126,7 @@ mod tests { let ix = storage_instruction::advertise_recent_blockhash( &pubkey, Hash::default(), - ENTRIES_PER_SEGMENT, + SLOTS_PER_SEGMENT, ); assert_eq!( @@ -158,7 +150,7 @@ mod tests { } #[test] - fn test_submit_mining_invalid_entry_height() { + fn test_submit_mining_invalid_slot() { solana_logger::setup(); let pubkey = Pubkey::new_rand(); let mut accounts = [Account::default(), Account::default()]; @@ -197,7 +189,7 @@ mod tests { let mut bank = Bank::new(&genesis_block); bank.add_instruction_processor(id(), process_instruction); - let entry_height = 0; + let slot = 0; let bank_client = BankClient::new(bank); let ix = system_instruction::create_account(&mint_pubkey, &validator, 10, 4 * 1042, &id()); @@ -209,7 +201,7 @@ mod tests { let ix = storage_instruction::advertise_recent_blockhash( &validator, Hash::default(), - ENTRIES_PER_SEGMENT, + SLOTS_PER_SEGMENT, ); bank_client @@ -219,7 +211,7 @@ mod tests { let ix = storage_instruction::mining_proof( &replicator, Hash::default(), - entry_height, + slot, Signature::default(), ); bank_client @@ -229,7 +221,7 @@ mod tests { let ix = storage_instruction::advertise_recent_blockhash( &validator, Hash::default(), - ENTRIES_PER_SEGMENT * 2, + SLOTS_PER_SEGMENT * 2, ); bank_client .send_instruction(&validator_keypair, ix) @@ -237,7 +229,7 @@ mod tests { let ix = storage_instruction::proof_validation( &validator, - entry_height, + slot, vec![CheckedProof { proof: Proof { id: replicator, @@ -254,13 +246,13 @@ mod tests { let ix = storage_instruction::advertise_recent_blockhash( &validator, Hash::default(), - ENTRIES_PER_SEGMENT * 3, + SLOTS_PER_SEGMENT * 3, ); bank_client .send_instruction(&validator_keypair, ix) .unwrap(); - let ix = storage_instruction::reward_claim(&validator, entry_height); + let ix = storage_instruction::reward_claim(&validator, slot); bank_client .send_instruction(&validator_keypair, ix) .unwrap(); @@ -274,7 +266,7 @@ mod tests { // bank.register_tick(&bank.last_blockhash()); //} - let ix = storage_instruction::reward_claim(&replicator, entry_height); + let ix = storage_instruction::reward_claim(&replicator, slot); bank_client .send_instruction(&replicator_keypair, ix) .unwrap(); @@ -283,21 +275,21 @@ mod tests { // assert_eq!(bank_client.get_balance(&replicator).unwrap(), TOTAL_REPLICATOR_REWARDS); } - fn get_storage_entry_height(client: &C, account: &Pubkey) -> u64 { + fn get_storage_slot(client: &C, account: &Pubkey) -> u64 { match client.get_account_data(&account).unwrap() { Some(storage_system_account_data) => { let contract = deserialize(&storage_system_account_data); if let Ok(contract) = contract { match contract { - StorageContract::ValidatorStorage { entry_height, .. } => { - return entry_height; + StorageContract::ValidatorStorage { slot, .. } => { + return slot; } - _ => info!("error in reading entry_height"), + _ => info!("error in reading slot"), } } } None => { - info!("error in reading entry_height"); + info!("error in reading slot"); } } 0 @@ -357,18 +349,18 @@ mod tests { let ix = storage_instruction::advertise_recent_blockhash( &validator_pubkey, storage_blockhash, - ENTRIES_PER_SEGMENT, + SLOTS_PER_SEGMENT, ); bank_client .send_instruction(&validator_keypair, ix) .unwrap(); - let entry_height = 0; + let slot = 0; let ix = storage_instruction::mining_proof( &replicator_pubkey, Hash::default(), - entry_height, + slot, Signature::default(), ); let _result = bank_client @@ -376,8 +368,8 @@ mod tests { .unwrap(); assert_eq!( - get_storage_entry_height(&bank_client, &validator_pubkey), - ENTRIES_PER_SEGMENT + get_storage_slot(&bank_client, &validator_pubkey), + SLOTS_PER_SEGMENT ); assert_eq!( get_storage_blockhash(&bank_client, &validator_pubkey),