Rocks db window service (#1888)

* Add db_window module for windowing functions from RocksDb

* Replace window with db_window functions in window_service

* Fix tests

* Make note of change in db_window

* Create RocksDb ledger in bin/fullnode

* Make db_ledger functions generic

* Add db_ledger to bin/replicator
This commit is contained in:
carllin 2018-11-24 19:32:33 -08:00 committed by GitHub
parent 69802e141f
commit 57a384d6a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 384 additions and 455 deletions

View File

@ -5,13 +5,15 @@
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use entry::Entry;
use ledger::Block;
use packet::{Blob, SharedBlob, BLOB_HEADER_SIZE};
use result::{Error, Result};
use rocksdb::{ColumnFamily, Options, WriteBatch, DB};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
pub const DB_LEDGER_DIRECTORY: &str = "db_ledger";
@ -232,6 +234,8 @@ pub const ERASURE_CF: &str = "erasure";
impl DbLedger {
// Opens a Ledger in directory, provides "infinite" window of blobs
pub fn open(ledger_path: &str) -> Result<Self> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
// Use default database options
let mut options = Options::default();
options.create_if_missing(true);
@ -260,10 +264,25 @@ impl DbLedger {
})
}
pub fn write_shared_blobs(&mut self, slot: u64, shared_blobs: &[SharedBlob]) -> Result<()> {
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
self.write_blobs(slot, &blobs)
pub fn destroy(ledger_path: &str) -> Result<()> {
let ledger_path = format!("{}/{}", ledger_path, DB_LEDGER_DIRECTORY);
DB::destroy(&Options::default(), &ledger_path)?;
Ok(())
}
pub fn write_shared_blobs<I>(&mut self, slot: u64, shared_blobs: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
{
for b in shared_blobs {
let bl = b.borrow().read().unwrap();
let index = bl.index()?;
let key = DataCf::key(slot, index);
self.insert_data_blob(&key, &*bl)?;
}
Ok(())
}
pub fn write_blobs<'a, I>(&mut self, slot: u64, blobs: I) -> Result<()>
@ -278,12 +297,20 @@ impl DbLedger {
Ok(())
}
pub fn write_entries(&mut self, slot: u64, entries: &[Entry]) -> Result<()> {
let shared_blobs = entries.to_blobs();
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
self.write_blobs(slot, &blobs)?;
Ok(())
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
entry.borrow().to_blob(
Some(idx as u64),
Some(Pubkey::default()),
Some(&default_addr),
)
});
self.write_shared_blobs(slot, shared_blobs)
}
pub fn insert_data_blob(&self, key: &[u8], new_blob: &Blob) -> Result<Vec<Entry>> {
@ -421,12 +448,17 @@ impl DbLedger {
}
}
pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) {
pub fn write_entries_to_ledger<I>(ledger_paths: &[&str], entries: I)
where
I: IntoIterator,
I::Item: Borrow<Entry>,
{
let mut entries = entries.into_iter();
for ledger_path in ledger_paths {
let mut db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, &entries)
.write_entries(DEFAULT_SLOT_HEIGHT, entries.by_ref())
.expect("Expected successful write of genesis entries");
}
}
@ -435,7 +467,6 @@ pub fn write_entries_to_ledger(ledger_paths: &[String], entries: &[Entry]) {
mod tests {
use super::*;
use ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use rocksdb::{Options, DB};
#[test]
fn test_put_get_simple() {
@ -485,8 +516,7 @@ mod tests {
// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
@ -548,8 +578,7 @@ mod tests {
// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
@ -591,8 +620,7 @@ mod tests {
// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
@ -628,8 +656,7 @@ mod tests {
// Destroying database without closing it first is undefined behavior
drop(ledger);
DB::destroy(&Options::default(), &ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
@ -644,7 +671,7 @@ mod tests {
let num_entries = 8;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (b, i) in shared_blobs.iter().zip(0..num_entries) {
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(1 << (i * 8)).unwrap();
}
@ -668,7 +695,6 @@ mod tests {
db_iterator.next();
}
}
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destruction");
DbLedger::destroy(&db_ledger_path).expect("Expected successful database destruction");
}
}

View File

@ -218,9 +218,10 @@ pub fn retransmit_all_leader_blocks(
for b in dq {
// Check if the blob is from the scheduled leader for its slot. If so,
// add to the retransmit_queue
let slot = b.read().unwrap().slot()?;
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
if let Ok(slot) = b.read().unwrap().slot() {
if let Some(leader_id) = leader_scheduler.get_leader_for_slot(slot) {
add_blob_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
}
}
@ -273,6 +274,9 @@ pub fn process_blob(
let is_coding = blob.read().unwrap().is_coding();
// Check if the blob is in the range of our known leaders. If not, we return.
// TODO: Need to update slot in broadcast, otherwise this check will fail with
// leader rotation enabled
// Github issue: https://github.com/solana-labs/solana/issues/1899.
let slot = blob.read().unwrap().slot()?;
let leader = leader_scheduler.get_leader_for_slot(slot);
@ -292,12 +296,11 @@ pub fn process_blob(
)?;
vec![]
} else {
let data_key = ErasureCf::key(slot, pix);
let data_key = DataCf::key(slot, pix);
db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
};
// TODO: Once erasure is fixed, readd that logic here
for entry in &consumed_entries {
*tick_height += entry.is_tick() as u64;
}
@ -529,8 +532,8 @@ mod test {
assert!(gap > 3);
let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
for (b, i) in shared_blobs.iter().zip(0..shared_blobs.len() as u64) {
b.write().unwrap().set_index(i * gap).unwrap();
for (i, b) in shared_blobs.iter().enumerate() {
b.write().unwrap().set_index(i as u64 * gap).unwrap();
}
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();

View File

@ -3,6 +3,7 @@
use bank::Bank;
use broadcast_stage::BroadcastStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use db_ledger::{write_entries_to_ledger, DbLedger};
use leader_scheduler::LeaderScheduler;
use ledger::read_ledger;
use ncp::Ncp;
@ -106,6 +107,7 @@ pub struct Fullnode {
broadcast_socket: UdpSocket,
rpc_addr: SocketAddr,
rpc_pubsub_addr: SocketAddr,
db_ledger: Arc<RwLock<DbLedger>>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
@ -258,6 +260,10 @@ impl Fullnode {
.expect("Leader not known after processing bank");
cluster_info.write().unwrap().set_leader(scheduled_leader);
// Create the RocksDb ledger
let db_ledger = Self::make_db_ledger(ledger_path);
let node_role = if scheduled_leader != keypair.pubkey() {
// Start in validator mode.
let tvu = Tvu::new(
@ -267,7 +273,6 @@ impl Fullnode {
entry_height,
*last_entry_id,
cluster_info.clone(),
shared_window.clone(),
node.sockets
.replicate
.iter()
@ -282,6 +287,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(ledger_path),
db_ledger.clone(),
);
let tpu_forwarder = TpuForwarder::new(
node.sockets
@ -352,6 +358,7 @@ impl Fullnode {
broadcast_socket: node.sockets.broadcast,
rpc_addr,
rpc_pubsub_addr,
db_ledger,
}
}
@ -423,7 +430,6 @@ impl Fullnode {
entry_height,
last_entry_id,
self.cluster_info.clone(),
self.shared_window.clone(),
self.replicate_socket
.iter()
.map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
@ -435,6 +441,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(&self.ledger_path),
self.db_ledger.clone(),
);
let tpu_forwarder = TpuForwarder::new(
self.transaction_sockets
@ -589,6 +596,19 @@ impl Fullnode {
),
)
}
fn make_db_ledger(ledger_path: &str) -> Arc<RwLock<DbLedger>> {
// Destroy any existing instances of the RocksDb ledger
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
let ledger_entries = read_ledger(ledger_path, true)
.expect("opening ledger")
.map(|entry| entry.unwrap());
write_entries_to_ledger(&[ledger_path], ledger_entries);
let db =
DbLedger::open(ledger_path).expect("Expected to successfully open database ledger");
Arc::new(RwLock::new(db))
}
}
impl Service for Fullnode {
@ -626,9 +646,10 @@ impl Service for Fullnode {
mod tests {
use bank::Bank;
use cluster_info::Node;
use db_ledger::*;
use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType};
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter};
use ledger::{create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger, LedgerWriter};
use packet::make_consecutive_blobs;
use service::Service;
use signature::{Keypair, KeypairUtil};
@ -839,6 +860,13 @@ mod tests {
+ num_ending_ticks as u64;
ledger_writer.write_entries(&active_set_entries).unwrap();
let validator_ledger_path =
tmp_copy_ledger(&bootstrap_leader_ledger_path, "test_wrong_role_transition");
let ledger_paths = vec![
bootstrap_leader_ledger_path.clone(),
validator_ledger_path.clone(),
];
// Create the common leader scheduling configuration
let num_slots_per_epoch = 3;
let leader_rotation_interval = 5;
@ -855,45 +883,53 @@ mod tests {
Some(genesis_tick_height),
);
// Test that a node knows to transition to a validator based on parsing the ledger
let leader_vote_account_keypair = Arc::new(Keypair::new());
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
leader_vote_account_keypair,
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
{
// Test that a node knows to transition to a validator based on parsing the ledger
let leader_vote_account_keypair = Arc::new(Keypair::new());
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
leader_vote_account_keypair,
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
match bootstrap_leader.node_role {
Some(NodeRole::Validator(_)) => (),
_ => {
panic!("Expected bootstrap leader to be a validator");
match bootstrap_leader.node_role {
Some(NodeRole::Validator(_)) => (),
_ => {
panic!("Expected bootstrap leader to be a validator");
}
}
}
// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
validator_node,
&bootstrap_leader_ledger_path,
Arc::new(validator_keypair),
Arc::new(validator_vote_account_keypair),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
validator_node,
&validator_ledger_path,
Arc::new(validator_keypair),
Arc::new(validator_vote_account_keypair),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
);
match validator.node_role {
Some(NodeRole::Leader(_)) => (),
_ => {
panic!("Expected node to be the leader");
match validator.node_role {
Some(NodeRole::Leader(_)) => (),
_ => {
panic!("Expected node to be the leader");
}
}
validator.close().expect("Expected node to close");
bootstrap_leader.close().expect("Expected node to close");
}
for path in ledger_paths {
DbLedger::destroy(&path).expect("Expected successful database destruction");
let _ignored = remove_dir_all(&path);
}
let _ignored = remove_dir_all(&bootstrap_leader_ledger_path);
}
#[test]
@ -1035,6 +1071,8 @@ mod tests {
// Shut down
t_responder.join().expect("responder thread join");
validator.close().unwrap();
remove_dir_all(&validator_ledger_path).unwrap();
DbLedger::destroy(&validator_ledger_path)
.expect("Expected successful database destruction");
let _ignored = remove_dir_all(&validator_ledger_path).unwrap();
}
}

View File

@ -13,7 +13,7 @@ use rayon::prelude::*;
use signature::{Keypair, KeypairUtil};
use solana_sdk::hash::{hash, Hash};
use solana_sdk::pubkey::Pubkey;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::fs::{copy, create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
use std::mem::size_of;
@ -638,6 +638,22 @@ pub fn create_tmp_sample_ledger(
(mint, path, genesis)
}
pub fn tmp_copy_ledger(from: &str, name: &str) -> String {
let tostr = get_tmp_ledger_path(name);
{
let to = Path::new(&tostr);
let from = Path::new(&from);
create_dir_all(to).unwrap();
copy(from.join("data"), to.join("data")).unwrap();
copy(from.join("index"), to.join("index")).unwrap();
}
tostr
}
pub fn make_tiny_test_entries(num: usize) -> Vec<Entry> {
let zero = Hash::default();
let one = hash(&zero.as_ref());

View File

@ -1,5 +1,6 @@
use blob_fetch_stage::BlobFetchStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use db_ledger::DbLedger;
use leader_scheduler::LeaderScheduler;
use ncp::Ncp;
use service::Service;
@ -104,9 +105,20 @@ impl Replicator {
let (entry_window_sender, entry_window_receiver) = channel();
// todo: pull blobs off the retransmit_receiver and recycle them?
let (retransmit_sender, retransmit_receiver) = channel();
// Create the RocksDb ledger, eventually will simply repurpose the input
// ledger path as the RocksDb ledger path once we replace the ledger with
// RocksDb. Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received
// entries after being passed to window_service
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&ledger_path.unwrap())
.expect("Expected to be able to open database ledger"),
));
let t_window = window_service(
db_ledger,
cluster_info.clone(),
shared_window.clone(),
0,
entry_height,
max_entry_height,
@ -165,6 +177,7 @@ impl Replicator {
mod tests {
use client::mk_client;
use cluster_info::Node;
use db_ledger::DbLedger;
use fullnode::Fullnode;
use leader_scheduler::LeaderScheduler;
use ledger::{create_tmp_genesis, get_tmp_ledger_path, read_ledger};
@ -204,67 +217,73 @@ mod tests {
let (mint, leader_ledger_path) =
create_tmp_genesis(leader_ledger_path, 100, leader_info.id, 1);
let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
vote_account_keypair,
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_info.id),
None,
);
{
let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
vote_account_keypair,
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_info.id),
None,
);
let mut leader_client = mk_client(&leader_info);
let mut leader_client = mk_client(&leader_info);
let bob = Keypair::new();
let bob = Keypair::new();
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
let replicator_keypair = Keypair::new();
info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let (replicator, _leader_info) = Replicator::new(
entry_height,
1,
&exit,
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
done.clone(),
);
let mut num_entries = 0;
for _ in 0..60 {
match read_ledger(replicator_ledger_path, true) {
Ok(entries) => {
for _ in entries {
num_entries += 1;
}
info!("{} entries", num_entries);
if num_entries > 0 {
break;
}
}
Err(e) => {
info!("error reading ledger: {:?}", e);
}
}
sleep(Duration::from_millis(300));
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
let replicator_keypair = Keypair::new();
info!("starting replicator node");
let replicator_node = Node::new_localhost_with_pubkey(replicator_keypair.pubkey());
let (replicator, _leader_info) = Replicator::new(
entry_height,
1,
&exit,
Some(replicator_ledger_path),
replicator_node,
Some(network_addr),
done.clone(),
);
let mut num_entries = 0;
for _ in 0..60 {
match read_ledger(replicator_ledger_path, true) {
Ok(entries) => {
for _ in entries {
num_entries += 1;
}
info!("{} entries", num_entries);
if num_entries > 0 {
break;
}
}
Err(e) => {
info!("error reading ledger: {:?}", e);
}
}
sleep(Duration::from_millis(300));
let last_id = leader_client.get_last_id();
leader_client
.transfer(1, &mint.keypair(), bob.pubkey(), &last_id)
.unwrap();
}
assert_eq!(done.load(Ordering::Relaxed), true);
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();
leader.exit();
}
assert_eq!(done.load(Ordering::Relaxed), true);
assert!(num_entries > 0);
exit.store(true, Ordering::Relaxed);
replicator.join();
leader.exit();
DbLedger::destroy(&leader_ledger_path).expect("Expected successful database destuction");
DbLedger::destroy(&replicator_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&leader_ledger_path);
let _ignored = remove_dir_all(&replicator_ledger_path);
}

View File

@ -2,6 +2,7 @@
use cluster_info::ClusterInfo;
use counter::Counter;
use db_ledger::DbLedger;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
@ -17,7 +18,6 @@ use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::SharedWindow;
use window_service::window_service;
fn retransmit(
@ -81,8 +81,8 @@ pub struct RetransmitStage {
impl RetransmitStage {
pub fn new(
db_ledger: Arc<RwLock<DbLedger>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
tick_height: u64,
entry_height: u64,
retransmit_socket: Arc<UdpSocket>,
@ -97,8 +97,8 @@ impl RetransmitStage {
let (entry_sender, entry_receiver) = channel();
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
db_ledger,
cluster_info.clone(),
window,
tick_height,
entry_height,
0,

View File

@ -13,6 +13,7 @@
use bank::Bank;
use blob_fetch_stage::BlobFetchStage;
use cluster_info::ClusterInfo;
use db_ledger::DbLedger;
use ledger_write_stage::LedgerWriteStage;
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
use retransmit_stage::RetransmitStage;
@ -24,7 +25,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use storage_stage::{StorageStage, StorageState};
use window::SharedWindow;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType {
@ -62,11 +62,11 @@ impl Tvu {
entry_height: u64,
last_entry_id: Hash,
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
replicate_sockets: Vec<UdpSocket>,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: Option<&str>,
db_ledger: Arc<RwLock<DbLedger>>,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
@ -76,12 +76,13 @@ impl Tvu {
blob_sockets.push(repair_socket.clone());
let (fetch_stage, blob_fetch_receiver) =
BlobFetchStage::new_multi_socket(blob_sockets, exit.clone());
//TODO
//the packets coming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
db_ledger,
&cluster_info,
window,
bank.tick_height(),
entry_height,
Arc::new(retransmit_socket),
@ -166,15 +167,19 @@ pub mod tests {
use bank::Bank;
use bincode::serialize;
use cluster_info::{ClusterInfo, Node};
use db_ledger::DbLedger;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
use ledger::get_tmp_ledger_path;
use logger;
use mint::Mint;
use ncp::Ncp;
use packet::SharedBlob;
use rocksdb::{Options, DB};
use service::Service;
use signature::{Keypair, KeypairUtil};
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
@ -262,6 +267,9 @@ pub mod tests {
let vote_account_keypair = Arc::new(Keypair::new());
let mut cur_hash = Hash::default();
let db_ledger_path = get_tmp_ledger_path("test_replicate");
let db_ledger =
DbLedger::open(&db_ledger_path).expect("Expected to successfully open ledger");
let tvu = Tvu::new(
Arc::new(target1_keypair),
vote_account_keypair,
@ -269,11 +277,11 @@ pub mod tests {
0,
cur_hash,
cref1,
dr_1.1,
target1.sockets.replicate,
target1.sockets.repair,
target1.sockets.retransmit,
None,
Arc::new(RwLock::new(db_ledger)),
);
let mut alice_ref_balance = starting_balance;
@ -346,5 +354,8 @@ pub mod tests {
dr_1.0.join().expect("join");
t_receiver.join().expect("join");
t_responder.join().expect("join");
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&db_ledger_path);
}
}

View File

@ -1,25 +1,26 @@
//! The `window_service` provides a thread for maintaining a window (tail of the ledger).
//!
use cluster_info::{ClusterInfo, NodeInfo};
use cluster_info::ClusterInfo;
use counter::Counter;
use db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT};
use db_window::*;
use entry::EntrySender;
use leader_scheduler::LeaderScheduler;
use log::Level;
use packet::SharedBlob;
use rand::{thread_rng, Rng};
use result::{Error, Result};
use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms;
use std::borrow::{Borrow, BorrowMut};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use streamer::{BlobReceiver, BlobSender};
use window::{SharedWindow, WindowUtil};
pub const MAX_REPAIR_BACKOFF: usize = 128;
@ -49,119 +50,21 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
thread_rng().gen_range(0, *times as u64) == 0
}
fn add_block_to_retransmit_queue(
b: &SharedBlob,
leader_id: Pubkey,
retransmit_queue: &mut Vec<SharedBlob>,
) {
let p = b.read().unwrap();
//TODO this check isn't safe against adverserial packets
//we need to maintain a sequence window
trace!(
"idx: {} addr: {:?} id: {:?} leader: {:?}",
p.index()
.expect("get_index in fn add_block_to_retransmit_queue"),
p.id()
.expect("get_id in trace! fn add_block_to_retransmit_queue"),
p.meta.addr(),
leader_id
);
if p.id().expect("get_id in fn add_block_to_retransmit_queue") == leader_id {
//TODO
//need to copy the retransmitted blob
//otherwise we get into races with which thread
//should do the recycling
//
let nv = SharedBlob::default();
{
let mut mnv = nv.write().unwrap();
let sz = p.meta.size;
mnv.meta.size = sz;
mnv.data[..sz].copy_from_slice(&p.data[..sz]);
}
retransmit_queue.push(nv);
}
}
fn retransmit_all_leader_blocks(
window: &SharedWindow,
maybe_leader: Option<NodeInfo>,
dq: &[SharedBlob],
id: &Pubkey,
consumed: u64,
received: u64,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
) -> Result<()> {
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
if let Some(leader) = maybe_leader {
let leader_id = leader.id;
for b in dq {
add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
}
if *pending_retransmits {
for w in window
.write()
.expect("Window write failed in retransmit_all_leader_blocks")
.iter_mut()
{
*pending_retransmits = false;
if w.leader_unknown {
if let Some(ref b) = w.data {
add_block_to_retransmit_queue(b, leader_id, &mut retransmit_queue);
w.leader_unknown = false;
}
}
}
}
submit(
influxdb::Point::new("retransmit-queue")
.add_field(
"count",
influxdb::Value::Integer(retransmit_queue.len() as i64),
).to_owned(),
);
} else {
warn!("{}: no leader to retransmit from", id);
}
if !retransmit_queue.is_empty() {
trace!(
"{}: RECV_WINDOW {} {}: retransmit {}",
id,
consumed,
received,
retransmit_queue.len(),
);
inc_new_counter_info!("streamer-recv_window-retransmit", retransmit_queue.len());
retransmit.send(retransmit_queue)?;
}
Ok(())
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn recv_window(
window: &SharedWindow,
db_ledger: &mut DbLedger,
id: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
consumed: &mut u64,
received: &mut u64,
leader_scheduler: &LeaderScheduler,
tick_height: &mut u64,
max_ix: u64,
r: &BlobReceiver,
s: &EntrySender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
done: &Arc<AtomicBool>,
) -> Result<()> {
let timer = Duration::from_millis(200);
let mut dq = r.recv_timeout(timer)?;
let maybe_leader: Option<NodeInfo> = cluster_info
.read()
.expect("'cluster_info' read lock in fn recv_window")
.leader_data()
.cloned();
let leader_unknown = maybe_leader.is_none();
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq)
}
@ -174,80 +77,41 @@ fn recv_window(
.to_owned(),
);
trace!(
"{}: RECV_WINDOW {} {}: got packets {}",
id,
*consumed,
*received,
dq.len(),
);
retransmit_all_leader_blocks(
window,
maybe_leader,
&dq,
id,
*consumed,
*received,
retransmit,
pending_retransmits,
)?;
retransmit_all_leader_blocks(&dq, leader_scheduler, retransmit)?;
let mut pixs = Vec::new();
//send a contiguous set of blocks
let mut consume_queue = Vec::new();
trace!("{} num blobs received: {}", id, dq.len());
for b in dq {
let (pix, meta_size) = {
let p = b.read().unwrap();
(p.index()?, p.meta.size)
};
pixs.push(pix);
if !window
.read()
.unwrap()
.blob_idx_in_window(&id, pix, *consumed, received)
{
continue;
}
// For downloading storage blobs,
// we only want up to a certain index
// then stop
if max_ix != 0 && pix > max_ix {
continue;
}
trace!("{} window pix: {} size: {}", id, pix, meta_size);
window.write().unwrap().process_blob(
id,
b,
let _ = process_blob(
leader_scheduler,
db_ledger,
&b,
max_ix,
pix,
&mut consume_queue,
consumed,
tick_height,
leader_unknown,
pending_retransmits,
done,
);
}
trace!(
"Elapsed processing time in recv_window(): {}",
duration_as_ms(&now.elapsed())
);
// Send a signal when we hit the max entry_height
if max_ix != 0 && *consumed == (max_ix + 1) {
done.store(true, Ordering::Relaxed);
}
}
if log_enabled!(Level::Trace) {
trace!("{}", window.read().unwrap().print(id, *consumed));
trace!(
"{}: consumed: {} received: {} sending consume.len: {} pixs: {:?} took {} ms",
id,
*consumed,
*received,
consume_queue.len(),
pixs,
duration_as_ms(&now.elapsed())
);
}
if !consume_queue.is_empty() {
inc_new_counter_info!("streamer-recv_window-consume", consume_queue.len());
s.send(consume_queue)?;
@ -257,8 +121,8 @@ fn recv_window(
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn window_service(
db_ledger: Arc<RwLock<DbLedger>>,
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
tick_height: u64,
entry_height: u64,
max_entry_height: u64,
@ -273,27 +137,20 @@ pub fn window_service(
.name("solana-window".to_string())
.spawn(move || {
let mut tick_height_ = tick_height;
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
let id = cluster_info.read().unwrap().my_data().id;
let mut pending_retransmits = false;
let id = cluster_info.read().unwrap().id();
trace!("{}: RECV_WINDOW started", id);
loop {
// Check if leader rotation was configured
if let Err(e) = recv_window(
&window,
db_ledger.write().unwrap().borrow_mut(),
&id,
&cluster_info,
&mut consumed,
&mut received,
leader_scheduler.read().unwrap().borrow(),
&mut tick_height_,
max_entry_height,
&r,
&s,
&retransmit,
&mut pending_retransmits,
&done,
) {
match e {
@ -306,45 +163,62 @@ pub fn window_service(
}
}
submit(
influxdb::Point::new("window-stage")
.add_field("consumed", influxdb::Value::Integer(consumed as i64))
.to_owned(),
);
let meta = {
let rlock = db_ledger.read().unwrap();
if received <= consumed {
trace!(
"{} we have everything received:{} consumed:{}",
id,
received,
consumed
rlock
.meta_cf
.get(&rlock.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
};
if let Ok(Some(meta)) = meta {
let received = meta.received;
let consumed = meta.consumed;
submit(
influxdb::Point::new("window-stage")
.add_field("consumed", influxdb::Value::Integer(consumed as i64))
.to_owned(),
);
continue;
}
//exponential backoff
if !repair_backoff(&mut last, &mut times, consumed) {
trace!("{} !repair_backoff() times = {}", id, times);
continue;
}
trace!("{} let's repair! times = {}", id, times);
// Consumed should never be bigger than received
assert!(consumed <= received);
if received == consumed {
trace!(
"{} we have everything received: {} consumed: {}",
id,
received,
consumed
);
continue;
}
let mut window = window.write().unwrap();
let reqs = window.repair(
&cluster_info,
&id,
times,
consumed,
received,
tick_height_,
max_entry_height,
&leader_scheduler,
);
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
//exponential backoff
if !repair_backoff(&mut last, &mut times, consumed) {
trace!("{} !repair_backoff() times = {}", id, times);
continue;
}
trace!("{} let's repair! times = {}", id, times);
let reqs = repair(
DEFAULT_SLOT_HEIGHT,
db_ledger.read().unwrap().borrow(),
&cluster_info,
&id,
times,
tick_height_,
max_entry_height,
&leader_scheduler,
);
if let Ok(reqs) = reqs {
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
0
});
}
}
}
}
}).unwrap()
@ -353,18 +227,21 @@ pub fn window_service(
#[cfg(test)]
mod test {
use cluster_info::{ClusterInfo, Node};
use db_ledger::DbLedger;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
use ledger::get_tmp_ledger_path;
use logger;
use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE};
use rocksdb::{Options, DB};
use solana_sdk::hash::Hash;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use streamer::{blob_receiver, responder};
use window::default_window;
use window_service::{repair_backoff, window_service};
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
@ -396,11 +273,14 @@ mod test {
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_test");
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
));
let t_window = window_service(
db_ledger,
subs,
win,
0,
0,
0,
@ -444,10 +324,13 @@ mod test {
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&db_ledger_path);
}
#[test]
pub fn window_send_no_leader_test() {
pub fn window_send_leader_test2() {
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
@ -459,11 +342,14 @@ mod test {
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test");
let db_ledger = Arc::new(RwLock::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
));
let t_window = window_service(
db_ledger,
subs.clone(),
win,
0,
0,
0,
@ -471,13 +357,7 @@ mod test {
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
// TODO: For now, the window still checks the ClusterInfo for the current leader
// to determine whether to retransmit a block. In the future when we rely on
// the LeaderScheduler for retransmits, this test will need to be rewritten
// because a leader should only be unknown in the window when the write stage
// hasn't yet calculated the leaders for slots in the next epoch (on entries
// at heights that are multiples of seed_rotation_interval in LeaderScheduler)
Arc::new(RwLock::new(LeaderScheduler::default())),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
);
let t_responder = {
@ -500,75 +380,8 @@ mod test {
msgs.push(b);
}
s_responder.send(msgs).expect("send");
t_responder
};
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
}
#[test]
pub fn window_send_late_leader_test() {
logger::setup();
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let cluster_info_me = ClusterInfo::new(tn.info.clone());
let me_id = cluster_info_me.my_data().id;
let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
let (s_retransmit, r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
subs.clone(),
win,
0,
0,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
// TODO: For now, the window still checks the ClusterInfo for the current leader
// to determine whether to retransmit a block. In the future when we rely on
// the LeaderScheduler for retransmits, this test will need to be rewritten
// becasue a leader should only be unknown in the window when the write stage
// hasn't yet calculated the leaders for slots in the next epoch (on entries
// at heights that are multiples of seed_rotation_interval in LeaderScheduler)
Arc::new(RwLock::new(LeaderScheduler::default())),
done,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut msgs = Vec::new();
for v in 0..10 {
let i = 9 - v;
let b = SharedBlob::default();
{
let mut w = b.write().unwrap();
w.set_index(i).unwrap();
w.set_id(&me_id).unwrap();
assert_eq!(i, w.index().unwrap());
w.meta.size = PACKET_DATA_SIZE;
w.meta.set_addr(&tn.info.ncp);
}
msgs.push(b);
}
s_responder.send(msgs).expect("send");
assert!(r_retransmit.recv_timeout(Duration::new(3, 0)).is_err());
subs.write().unwrap().set_leader(me_id);
let mut msgs1 = Vec::new();
for v in 1..5 {
let i = 9 + v;
@ -595,6 +408,9 @@ mod test {
t_receiver.join().expect("join");
t_responder.join().expect("join");
t_window.join().expect("join");
DB::destroy(&Options::default(), &db_ledger_path)
.expect("Expected successful database destuction");
let _ignored = remove_dir_all(&db_ledger_path);
}
#[test]

View File

@ -9,11 +9,12 @@ extern crate solana_sdk;
use solana::blob_fetch_stage::BlobFetchStage;
use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::contact_info::ContactInfo;
use solana::db_ledger::DbLedger;
use solana::entry::{reconstruct_entries_from_blobs, Entry};
use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use solana::ledger::{
create_tmp_genesis, create_tmp_sample_ledger, get_tmp_ledger_path, read_ledger, LedgerWindow,
create_tmp_genesis, create_tmp_sample_ledger, read_ledger, tmp_copy_ledger, LedgerWindow,
LedgerWriter,
};
use solana::logger;
@ -33,9 +34,8 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::{duration_as_ms, duration_as_s};
use std::collections::{HashSet, VecDeque};
use std::env;
use std::fs::{copy, create_dir_all, remove_dir_all};
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::{sleep, Builder, JoinHandle};
@ -110,22 +110,6 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
rv
}
fn tmp_copy_ledger(from: &str, name: &str) -> String {
let tostr = get_tmp_ledger_path(name);
{
let to = Path::new(&tostr);
let from = Path::new(&from);
create_dir_all(to).unwrap();
copy(from.join("data"), to.join("data")).unwrap();
copy(from.join("index"), to.join("index")).unwrap();
}
tostr
}
fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec<Entry> {
let mut id = start_hash;
let mut num_hashes = 0;
@ -1087,6 +1071,7 @@ fn test_leader_validator_basic() {
assert!(min_len >= bootstrap_height);
for path in ledger_paths {
DbLedger::destroy(&path).expect("Expected successful database destruction");
remove_dir_all(path).unwrap();
}
}
@ -1346,28 +1331,20 @@ fn test_full_leader_validator_network() {
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(leader_rotation_interval),
Some(100),
);
let exit = Arc::new(AtomicBool::new(false));
// Start the bootstrap leader fullnode
let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
Arc::new(node_keypairs.pop_front().unwrap()),
Arc::new(vote_account_keypairs.pop_front().unwrap()),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
)));
let mut nodes: Vec<Arc<RwLock<Fullnode>>> = vec![bootstrap_leader.clone()];
let mut t_nodes = vec![run_node(
bootstrap_leader_info.id,
bootstrap_leader,
exit.clone(),
)];
// Postpone starting the leader until after the validators are up and running
// to avoid
// 1) Scenario where leader rotates before validators can start up
// 2) Modifying the leader ledger which validators are going to be copying
// during startup
let leader_keypair = node_keypairs.pop_front().unwrap();
let leader_vote_keypair = vote_account_keypairs.pop_front().unwrap();
let mut nodes: Vec<Arc<RwLock<Fullnode>>> = vec![];
let mut t_nodes = vec![];
// Start up the validators
for kp in node_keypairs.into_iter() {
@ -1375,7 +1352,9 @@ fn test_full_leader_validator_network() {
&bootstrap_leader_ledger_path,
"test_full_leader_validator_network",
);
ledger_paths.push(validator_ledger_path.clone());
let validator_id = kp.pubkey();
let validator_node = Node::new_localhost_with_pubkey(validator_id);
let validator = Arc::new(RwLock::new(Fullnode::new(
@ -1393,6 +1372,25 @@ fn test_full_leader_validator_network() {
t_nodes.push(run_node(validator_id, validator, exit.clone()));
}
// Start up the bootstrap leader
let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
Arc::new(leader_keypair),
Arc::new(leader_vote_keypair),
Some(bootstrap_leader_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
None,
)));
nodes.push(bootstrap_leader.clone());
t_nodes.push(run_node(
bootstrap_leader_info.id,
bootstrap_leader,
exit.clone(),
));
// Wait for convergence
let num_converged = converge(&bootstrap_leader_info, N + 1).len();
assert_eq!(num_converged, N + 1);
@ -1495,7 +1493,9 @@ fn test_full_leader_validator_network() {
}
assert!(shortest.unwrap() >= target_height);
for path in ledger_paths {
DbLedger::destroy(&path).expect("Expected successful database destruction");
remove_dir_all(path).unwrap();
}
}