add db_ledger genesis, rework to_blob(), to_blobs() (#2135)

This commit is contained in:
Rob Walker 2018-12-12 20:42:12 -08:00 committed by GitHub
parent a05a378db4
commit 4f48f1a850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 69 additions and 82 deletions

View File

@ -5,6 +5,7 @@ extern crate clap;
use serde_json; use serde_json;
use clap::{App, Arg}; use clap::{App, Arg};
use solana::db_ledger::genesis;
use solana::ledger::LedgerWriter; use solana::ledger::LedgerWriter;
use solana::mint::Mint; use solana::mint::Mint;
use solana_sdk::signature::{read_keypair, KeypairUtil}; use solana_sdk::signature::{read_keypair, KeypairUtil};
@ -79,9 +80,13 @@ fn main() -> Result<(), Box<dyn error::Error>> {
); );
// Write the ledger entries // Write the ledger entries
let entries = mint.create_entries();
let ledger_path = matches.value_of("ledger").unwrap(); let ledger_path = matches.value_of("ledger").unwrap();
let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?; let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?;
ledger_writer.write_entries(&mint.create_entries())?; ledger_writer.write_entries(&entries)?;
genesis(&ledger_path, &leader_keypair, &entries)?;
Ok(()) Ok(())
} }

View File

@ -70,13 +70,12 @@ fn broadcast(
// Generate the slot heights for all the entries inside ventries // Generate the slot heights for all the entries inside ventries
let slot_heights = generate_slots(&ventries, leader_scheduler); let slot_heights = generate_slots(&ventries, leader_scheduler);
let blobs_vec: Vec<_> = ventries let blobs: Vec<_> = ventries
.into_par_iter() .into_par_iter()
.flat_map(|p| p.to_blobs()) .flat_map(|p| p.to_blobs())
.collect(); .collect();
let blobs_slot_heights: Vec<(SharedBlob, u64)> = let blobs_slot_heights: Vec<(SharedBlob, u64)> = blobs.into_iter().zip(slot_heights).collect();
blobs_vec.into_iter().zip(slot_heights).collect();
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed()); let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());

View File

@ -320,10 +320,12 @@ impl DbLedger {
I: IntoIterator, I: IntoIterator,
I::Item: Borrow<Entry>, I::Item: Borrow<Entry>,
{ {
let shared_blobs = entries let shared_blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
.into_iter() let b = entry.borrow().to_blob();
.enumerate() b.write().unwrap().set_index(idx as u64).unwrap();
.map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), None, None)); b
});
self.write_shared_blobs(slot, shared_blobs) self.write_shared_blobs(slot, shared_blobs)
} }
@ -534,19 +536,19 @@ where
} }
} }
pub fn genesis<'a, I>(ledger_path: &str, keypair: Option<&Keypair>, entries: I) -> Result<()> pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Result<()>
where where
I: IntoIterator<Item = &'a Entry>, I: IntoIterator<Item = &'a Entry>,
{ {
let mut db_ledger = DbLedger::open(ledger_path)?; let mut db_ledger = DbLedger::open(ledger_path)?;
let pubkey = keypair.map(|k| k.pubkey());
// TODO sign these blobs with keypair // TODO sign these blobs with keypair
let blobs = entries let blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
.into_iter() let b = entry.borrow().to_blob();
.enumerate() b.write().unwrap().set_index(idx as u64).unwrap();
.map(|(idx, entry)| entry.borrow().to_blob(Some(idx as u64), pubkey, None)); b.write().unwrap().set_id(&keypair.pubkey()).unwrap();
b
});
db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?; db_ledger.write_shared_blobs(DEFAULT_SLOT_HEIGHT, blobs)?;
Ok(()) Ok(())
@ -556,6 +558,7 @@ where
mod tests { mod tests {
use super::*; use super::*;
use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use crate::packet::index_blobs;
#[test] #[test]
fn test_put_get_simple() { fn test_put_get_simple() {
@ -611,6 +614,12 @@ mod tests {
#[test] #[test]
fn test_get_blobs_bytes() { fn test_get_blobs_bytes() {
let shared_blobs = make_tiny_test_entries(10).to_blobs(); let shared_blobs = make_tiny_test_entries(10).to_blobs();
index_blobs(
shared_blobs.iter().zip(vec![0u64; 10].into_iter()),
&Keypair::new().pubkey(),
0,
);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
let slot = DEFAULT_SLOT_HEIGHT; let slot = DEFAULT_SLOT_HEIGHT;
@ -836,9 +845,9 @@ mod tests {
pub fn test_genesis_and_entry_iterator() { pub fn test_genesis_and_entry_iterator() {
// Create RocksDb ledger // Create RocksDb ledger
let entries = make_tiny_test_entries(100); let entries = make_tiny_test_entries(100);
let ledger_path = get_tmp_ledger_path("test_entry_iterator"); let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator");
{ {
assert!(genesis(&ledger_path, None, &entries).is_ok()); assert!(genesis(&ledger_path, &Keypair::new(), &entries).is_ok());
let ledger = DbLedger::open(&ledger_path).expect("open failed"); let ledger = DbLedger::open(&ledger_path).expect("open failed");

View File

@ -423,7 +423,7 @@ mod test {
#[cfg(all(feature = "erasure", test))] #[cfg(all(feature = "erasure", test))]
use crate::erasure::{NUM_CODING, NUM_DATA}; use crate::erasure::{NUM_CODING, NUM_DATA};
use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block}; use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, Block};
use crate::packet::{Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE}; use crate::packet::{index_blobs, Blob, Packet, Packets, SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{receiver, responder, PacketReceiver}; use crate::streamer::{receiver, responder, PacketReceiver};
use rocksdb::{Options, DB}; use rocksdb::{Options, DB};
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
@ -689,6 +689,13 @@ mod test {
// Write entries // Write entries
let num_entries = 10; let num_entries = 10;
let shared_blobs = make_tiny_test_entries(num_entries).to_blobs(); let shared_blobs = make_tiny_test_entries(num_entries).to_blobs();
index_blobs(
shared_blobs.iter().zip(vec![0u64; num_entries].into_iter()),
&Keypair::new().pubkey(),
0,
);
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
db_ledger.write_blobs(slot, &blobs).unwrap(); db_ledger.write_blobs(slot, &blobs).unwrap();

View File

@ -7,11 +7,9 @@ use crate::poh::Poh;
use crate::result::Result; use crate::result::Result;
use bincode::{deserialize, serialize_into, serialized_size}; use bincode::{deserialize, serialize_into, serialized_size};
use solana_sdk::hash::Hash; use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::transaction::Transaction; use solana_sdk::transaction::Transaction;
use std::io::Cursor; use std::io::Cursor;
use std::mem::size_of; use std::mem::size_of;
use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::{Receiver, Sender};
pub type EntrySender = Sender<Vec<Entry>>; pub type EntrySender = Sender<Vec<Entry>>;
@ -106,12 +104,7 @@ impl Entry {
entry entry
} }
pub fn to_blob( pub fn to_blob(&self) -> SharedBlob {
&self,
idx: Option<u64>,
id: Option<Pubkey>,
addr: Option<&SocketAddr>,
) -> SharedBlob {
let blob = SharedBlob::default(); let blob = SharedBlob::default();
{ {
let mut blob_w = blob.write().unwrap(); let mut blob_w = blob.write().unwrap();
@ -121,17 +114,6 @@ impl Entry {
out.position() as usize out.position() as usize
}; };
blob_w.set_size(pos); blob_w.set_size(pos);
if let Some(idx) = idx {
blob_w.set_index(idx).expect("set_index()");
}
if let Some(id) = id {
blob_w.set_id(&id).expect("set_id()");
}
if let Some(addr) = addr {
blob_w.meta.set_addr(addr);
}
blob_w.set_flags(0).unwrap();
} }
blob blob
} }

View File

@ -674,9 +674,9 @@ mod tests {
make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig, make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig,
}; };
use crate::ledger::{ use crate::ledger::{
create_tmp_genesis, create_tmp_sample_ledger, tmp_copy_ledger, LedgerWriter, create_tmp_genesis, create_tmp_sample_ledger, make_consecutive_blobs, tmp_copy_ledger,
LedgerWriter,
}; };
use crate::packet::make_consecutive_blobs;
use crate::service::Service; use crate::service::Service;
use crate::streamer::responder; use crate::streamer::responder;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
@ -1050,7 +1050,7 @@ mod tests {
let total_blobs_to_send = bootstrap_height + extra_blobs; let total_blobs_to_send = bootstrap_height + extra_blobs;
let tvu_address = &validator_info.tvu; let tvu_address = &validator_info.tvu;
let msgs = make_consecutive_blobs( let msgs = make_consecutive_blobs(
leader_id, &leader_id,
total_blobs_to_send, total_blobs_to_send,
ledger_initial_len, ledger_initial_len,
last_id, last_id,

View File

@ -20,7 +20,6 @@ use std::fs::{copy, create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
use std::mem::size_of; use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::Path; use std::path::Path;
// //
@ -407,7 +406,7 @@ impl LedgerWriter {
I: IntoIterator<Item = &'a Entry>, I: IntoIterator<Item = &'a Entry>,
{ {
for entry in entries { for entry in entries {
self.write_entry_noflush(&entry)?; self.write_entry_noflush(entry)?;
} }
self.index.flush()?; self.index.flush()?;
self.data.flush()?; self.data.flush()?;
@ -452,7 +451,6 @@ pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent. /// Verifies the hashes and counts of a slice of transactions are all consistent.
fn verify(&self, start_hash: &Hash) -> bool; fn verify(&self, start_hash: &Hash) -> bool;
fn to_blobs(&self) -> Vec<SharedBlob>; fn to_blobs(&self) -> Vec<SharedBlob>;
fn to_blobs_with_id(&self, id: Pubkey, start_id: u64, addr: &SocketAddr) -> Vec<SharedBlob>;
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>; fn votes(&self) -> Vec<(Pubkey, Vote, Hash)>;
} }
@ -480,16 +478,8 @@ impl Block for [Entry] {
}) })
} }
fn to_blobs_with_id(&self, id: Pubkey, start_idx: u64, addr: &SocketAddr) -> Vec<SharedBlob> {
self.iter()
.enumerate()
.map(|(i, entry)| entry.to_blob(Some(start_idx + i as u64), Some(id), Some(&addr)))
.collect()
}
fn to_blobs(&self) -> Vec<SharedBlob> { fn to_blobs(&self) -> Vec<SharedBlob> {
let default_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); self.iter().map(|entry| entry.to_blob()).collect()
self.to_blobs_with_id(Pubkey::default(), 0, &default_addr)
} }
fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> { fn votes(&self) -> Vec<(Pubkey, Vote, Hash)> {
@ -705,6 +695,28 @@ pub fn make_large_test_entries(num_entries: usize) -> Vec<Entry> {
vec![entry; num_entries] vec![entry; num_entries]
} }
#[cfg(test)]
pub fn make_consecutive_blobs(
id: &Pubkey,
num_blobs_to_make: u64,
start_height: u64,
start_hash: Hash,
addr: &std::net::SocketAddr,
) -> Vec<SharedBlob> {
let entries = create_ticks(num_blobs_to_make as usize, start_hash);
let blobs = entries.to_blobs();
let mut index = start_height;
for blob in &blobs {
let mut blob = blob.write().unwrap();
blob.set_index(index).unwrap();
blob.set_id(id).unwrap();
blob.meta.set_addr(addr);
index += 1;
}
blobs
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -1,17 +1,11 @@
//! The `packet` module defines data structures and methods to pull data from the network. //! The `packet` module defines data structures and methods to pull data from the network.
use crate::counter::Counter; use crate::counter::Counter;
#[cfg(test)]
use crate::entry::Entry;
#[cfg(test)]
use crate::ledger::Block;
use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS}; use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use bincode::{deserialize, serialize}; use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use log::Level; use log::Level;
use serde::Serialize; use serde::Serialize;
#[cfg(test)]
use solana_sdk::hash::Hash;
pub use solana_sdk::packet::PACKET_DATA_SIZE; pub use solana_sdk::packet::PACKET_DATA_SIZE;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::borrow::Borrow; use std::borrow::Borrow;
@ -459,27 +453,6 @@ where
} }
} }
#[cfg(test)]
pub fn make_consecutive_blobs(
me_id: Pubkey,
num_blobs_to_make: u64,
start_height: u64,
start_hash: Hash,
addr: &SocketAddr,
) -> SharedBlobs {
let mut last_hash = start_hash;
let num_hashes = 1;
let mut all_entries = Vec::with_capacity(num_blobs_to_make as usize);
for _ in 0..num_blobs_to_make {
let entry = Entry::new(&last_hash, 0, num_hashes, vec![]);
last_hash = entry.id;
all_entries.push(entry);
}
let mut new_blobs = all_entries.to_blobs_with_id(me_id, start_height, addr);
new_blobs.truncate(num_blobs_to_make as usize);
new_blobs
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::packet::{ use crate::packet::{

View File

@ -236,9 +236,9 @@ mod test {
use crate::db_ledger::DbLedger; use crate::db_ledger::DbLedger;
use crate::entry::Entry; use crate::entry::Entry;
use crate::leader_scheduler::LeaderScheduler; use crate::leader_scheduler::LeaderScheduler;
use crate::ledger::get_tmp_ledger_path; use crate::ledger::{get_tmp_ledger_path, make_consecutive_blobs};
use crate::logger; use crate::logger;
use crate::packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE}; use crate::packet::{SharedBlob, PACKET_DATA_SIZE};
use crate::streamer::{blob_receiver, responder}; use crate::streamer::{blob_receiver, responder};
use crate::window_service::{repair_backoff, window_service}; use crate::window_service::{repair_backoff, window_service};
use rocksdb::{Options, DB}; use rocksdb::{Options, DB};
@ -306,7 +306,7 @@ mod test {
let num_blobs_to_make = 10; let num_blobs_to_make = 10;
let gossip_address = &tn.info.gossip; let gossip_address = &tn.info.gossip;
let msgs = make_consecutive_blobs( let msgs = make_consecutive_blobs(
me_id, &me_id,
num_blobs_to_make, num_blobs_to_make,
0, 0,
Hash::default(), Hash::default(),