DbLedger doesn't need to be mut, doesn't need an RwLock (#2215)

* DbLedger doesn't need to be mut, doesn't need an RwLock

* fix erasure cases
This commit is contained in:
Rob Walker 2018-12-18 15:18:57 -08:00 committed by GitHub
parent b101f40c32
commit a65022aed7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 95 additions and 139 deletions

View File

@ -33,7 +33,7 @@ pub enum BroadcastServiceReturnType {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn broadcast( fn broadcast(
db_ledger: &Arc<RwLock<DbLedger>>, db_ledger: &Arc<DbLedger>,
max_tick_height: Option<u64>, max_tick_height: Option<u64>,
leader_id: Pubkey, leader_id: Pubkey,
node_info: &NodeInfo, node_info: &NodeInfo,
@ -130,7 +130,7 @@ fn broadcast(
assert!(win[pos].data.is_none()); assert!(win[pos].data.is_none());
win[pos].data = Some(b.clone()); win[pos].data = Some(b.clone());
} }
db_ledger.write().unwrap().write_shared_blobs(vec![b])?; db_ledger.write_shared_blobs(vec![b])?;
} }
} }
@ -236,7 +236,7 @@ pub struct BroadcastService {
impl BroadcastService { impl BroadcastService {
fn run( fn run(
db_ledger: &Arc<RwLock<DbLedger>>, db_ledger: &Arc<DbLedger>,
sock: &UdpSocket, sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
window: &SharedWindow, window: &SharedWindow,
@ -304,7 +304,7 @@ impl BroadcastService {
/// completing the cycle. /// completing the cycle.
#[allow(clippy::too_many_arguments, clippy::new_ret_no_self)] #[allow(clippy::too_many_arguments, clippy::new_ret_no_self)]
pub fn new( pub fn new(
db_ledger: Arc<RwLock<DbLedger>>, db_ledger: Arc<DbLedger>,
sock: UdpSocket, sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow, window: SharedWindow,
@ -365,7 +365,7 @@ mod test {
use std::time::Duration; use std::time::Duration;
struct DummyBroadcastService { struct DummyBroadcastService {
db_ledger: Arc<RwLock<DbLedger>>, db_ledger: Arc<DbLedger>,
broadcast_service: BroadcastService, broadcast_service: BroadcastService,
entry_sender: Sender<Vec<Entry>>, entry_sender: Sender<Vec<Entry>>,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
@ -379,7 +379,7 @@ mod test {
max_tick_height: u64, max_tick_height: u64,
) -> DummyBroadcastService { ) -> DummyBroadcastService {
// Make the database ledger // Make the database ledger
let db_ledger = Arc::new(RwLock::new(DbLedger::open(ledger_path).unwrap())); let db_ledger = Arc::new(DbLedger::open(ledger_path).unwrap());
// Make the leader node and scheduler // Make the leader node and scheduler
let leader_info = Node::new_localhost_with_pubkey(leader_pubkey); let leader_info = Node::new_localhost_with_pubkey(leader_pubkey);
@ -459,16 +459,16 @@ mod test {
} }
sleep(Duration::from_millis(2000)); sleep(Duration::from_millis(2000));
let r_db = broadcast_service.db_ledger.read().unwrap(); let db_ledger = broadcast_service.db_ledger;
for i in 0..max_tick_height - start_tick_height { for i in 0..max_tick_height - start_tick_height {
let (_, slot) = leader_scheduler let (_, slot) = leader_scheduler
.read() .read()
.unwrap() .unwrap()
.get_scheduled_leader(start_tick_height + i + 1) .get_scheduled_leader(start_tick_height + i + 1)
.expect("Leader should exist"); .expect("Leader should exist");
let result = r_db let result = db_ledger
.data_cf .data_cf
.get_by_slot_index(&r_db.db, slot, entry_height + i) .get_by_slot_index(&db_ledger.db, slot, entry_height + i)
.unwrap(); .unwrap();
assert!(result.is_some()); assert!(result.is_some());

View File

@ -673,26 +673,20 @@ impl ClusterInfo {
fn run_window_request( fn run_window_request(
from: &NodeInfo, from: &NodeInfo,
from_addr: &SocketAddr, from_addr: &SocketAddr,
db_ledger: Option<&Arc<RwLock<DbLedger>>>, db_ledger: Option<&Arc<DbLedger>>,
me: &NodeInfo, me: &NodeInfo,
ix: u64, ix: u64,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
if let Some(db_ledger) = db_ledger { if let Some(db_ledger) = db_ledger {
let meta = { let meta = db_ledger
let r_db = db_ledger.read().unwrap(); .meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT));
r_db.meta_cf
.get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
};
if let Ok(Some(meta)) = meta { if let Ok(Some(meta)) = meta {
let max_slot = meta.received_slot; let max_slot = meta.received_slot;
// Try to find the requested index in one of the slots // Try to find the requested index in one of the slots
for i in 0..=max_slot { for i in 0..=max_slot {
let get_result = { let get_result = db_ledger.data_cf.get_by_slot_index(&db_ledger.db, i, ix);
let r_db = db_ledger.read().unwrap();
r_db.data_cf.get_by_slot_index(&r_db.db, i, ix)
};
if let Ok(Some(blob_data)) = get_result { if let Ok(Some(blob_data)) = get_result {
inc_new_counter_info!("cluster_info-window-request-ledger", 1); inc_new_counter_info!("cluster_info-window-request-ledger", 1);
@ -716,7 +710,7 @@ impl ClusterInfo {
//TODO we should first coalesce all the requests //TODO we should first coalesce all the requests
fn handle_blob( fn handle_blob(
obj: &Arc<RwLock<Self>>, obj: &Arc<RwLock<Self>>,
db_ledger: Option<&Arc<RwLock<DbLedger>>>, db_ledger: Option<&Arc<DbLedger>>,
blob: &Blob, blob: &Blob,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
deserialize(&blob.data[..blob.meta.size]) deserialize(&blob.data[..blob.meta.size])
@ -830,7 +824,7 @@ impl ClusterInfo {
fn handle_request_window_index( fn handle_request_window_index(
me: &Arc<RwLock<Self>>, me: &Arc<RwLock<Self>>,
from: &ContactInfo, from: &ContactInfo,
db_ledger: Option<&Arc<RwLock<DbLedger>>>, db_ledger: Option<&Arc<DbLedger>>,
ix: u64, ix: u64,
from_addr: &SocketAddr, from_addr: &SocketAddr,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
@ -870,7 +864,7 @@ impl ClusterInfo {
fn handle_protocol( fn handle_protocol(
me: &Arc<RwLock<Self>>, me: &Arc<RwLock<Self>>,
from_addr: &SocketAddr, from_addr: &SocketAddr,
db_ledger: Option<&Arc<RwLock<DbLedger>>>, db_ledger: Option<&Arc<DbLedger>>,
request: Protocol, request: Protocol,
) -> Vec<SharedBlob> { ) -> Vec<SharedBlob> {
match request { match request {
@ -934,7 +928,7 @@ impl ClusterInfo {
/// Process messages from the network /// Process messages from the network
fn run_listen( fn run_listen(
obj: &Arc<RwLock<Self>>, obj: &Arc<RwLock<Self>>,
db_ledger: Option<&Arc<RwLock<DbLedger>>>, db_ledger: Option<&Arc<DbLedger>>,
requests_receiver: &BlobReceiver, requests_receiver: &BlobReceiver,
response_sender: &BlobSender, response_sender: &BlobSender,
) -> Result<()> { ) -> Result<()> {
@ -954,7 +948,7 @@ impl ClusterInfo {
} }
pub fn listen( pub fn listen(
me: Arc<RwLock<Self>>, me: Arc<RwLock<Self>>,
db_ledger: Option<Arc<RwLock<DbLedger>>>, db_ledger: Option<Arc<DbLedger>>,
requests_receiver: BlobReceiver, requests_receiver: BlobReceiver,
response_sender: BlobSender, response_sender: BlobSender,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -1225,7 +1219,7 @@ mod tests {
solana_logger::setup(); solana_logger::setup();
let ledger_path = get_tmp_ledger_path("run_window_request"); let ledger_path = get_tmp_ledger_path("run_window_request");
{ {
let db_ledger = Arc::new(RwLock::new(DbLedger::open(&ledger_path).unwrap())); let db_ledger = Arc::new(DbLedger::open(&ledger_path).unwrap());
let me = NodeInfo::new( let me = NodeInfo::new(
Keypair::new().pubkey(), Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"), socketaddr!("127.0.0.1:1234"),
@ -1249,12 +1243,9 @@ mod tests {
w_blob.meta.size = data_size + BLOB_HEADER_SIZE; w_blob.meta.size = data_size + BLOB_HEADER_SIZE;
} }
{ db_ledger
let mut w_ledger = db_ledger.write().unwrap(); .write_shared_blobs(vec![&blob])
w_ledger .expect("Expect successful ledger write");
.write_shared_blobs(vec![&blob])
.expect("Expect successful ledger write");
}
let rv = let rv =
ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1); ClusterInfo::run_window_request(&me, &socketaddr_any!(), Some(&db_ledger), &me, 1);

View File

@ -285,7 +285,7 @@ impl DbLedger {
Ok(()) Ok(())
} }
pub fn write_shared_blobs<I>(&mut self, shared_blobs: I) -> Result<Vec<Entry>> pub fn write_shared_blobs<I>(&self, shared_blobs: I) -> Result<Vec<Entry>>
where where
I: IntoIterator, I: IntoIterator,
I::Item: Borrow<SharedBlob>, I::Item: Borrow<SharedBlob>,
@ -302,7 +302,7 @@ impl DbLedger {
Ok(entries) Ok(entries)
} }
pub fn write_blobs<'a, I>(&mut self, blobs: I) -> Result<Vec<Entry>> pub fn write_blobs<'a, I>(&self, blobs: I) -> Result<Vec<Entry>>
where where
I: IntoIterator<Item = &'a &'a Blob>, I: IntoIterator<Item = &'a &'a Blob>,
{ {
@ -316,7 +316,7 @@ impl DbLedger {
Ok(entries) Ok(entries)
} }
pub fn write_entries<I>(&mut self, slot: u64, entries: I) -> Result<Vec<Entry>> pub fn write_entries<I>(&self, slot: u64, entries: I) -> Result<Vec<Entry>>
where where
I: IntoIterator, I: IntoIterator,
I::Item: Borrow<Entry>, I::Item: Borrow<Entry>,
@ -427,7 +427,7 @@ impl DbLedger {
// //
// Return tuple of (number of blob read, total size of blobs read) // Return tuple of (number of blob read, total size of blobs read)
pub fn get_blob_bytes( pub fn get_blob_bytes(
&mut self, &self,
start_index: u64, start_index: u64,
num_blobs: u64, num_blobs: u64,
buf: &mut [u8], buf: &mut [u8],
@ -533,7 +533,7 @@ where
{ {
let mut entries = entries.into_iter(); let mut entries = entries.into_iter();
for ledger_path in ledger_paths { for ledger_path in ledger_paths {
let mut db_ledger = let db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
db_ledger db_ledger
.write_entries(slot_height, entries.by_ref()) .write_entries(slot_height, entries.by_ref())
@ -545,7 +545,7 @@ pub fn genesis<'a, I>(ledger_path: &str, keypair: &Keypair, entries: I) -> Resul
where where
I: IntoIterator<Item = &'a Entry>, I: IntoIterator<Item = &'a Entry>,
{ {
let mut db_ledger = DbLedger::open(ledger_path)?; let db_ledger = DbLedger::open(ledger_path)?;
// TODO sign these blobs with keypair // TODO sign these blobs with keypair
let blobs = entries.into_iter().enumerate().map(|(idx, entry)| { let blobs = entries.into_iter().enumerate().map(|(idx, entry)| {
@ -631,7 +631,7 @@ mod tests {
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes"); let ledger_path = get_tmp_ledger_path("test_get_blobs_bytes");
let mut ledger = DbLedger::open(&ledger_path).unwrap(); let ledger = DbLedger::open(&ledger_path).unwrap();
ledger.write_blobs(&blobs).unwrap(); ledger.write_blobs(&blobs).unwrap();
let mut buf = [0; 1024]; let mut buf = [0; 1024];
@ -814,7 +814,7 @@ mod tests {
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_iteration_order"); let db_ledger_path = get_tmp_ledger_path("test_iteration_order");
{ {
let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries // Write entries
let num_entries = 8; let num_entries = 8;

View File

@ -273,7 +273,7 @@ pub fn add_blob_to_retransmit_queue(
/// range of blobs to a queue to be sent on to the next stage. /// range of blobs to a queue to be sent on to the next stage.
pub fn process_blob( pub fn process_blob(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
db_ledger: &Arc<RwLock<DbLedger>>, db_ledger: &Arc<DbLedger>,
blob: &SharedBlob, blob: &SharedBlob,
max_ix: u64, max_ix: u64,
consume_queue: &mut Vec<Entry>, consume_queue: &mut Vec<Entry>,
@ -303,21 +303,15 @@ pub fn process_blob(
let erasure_key = ErasureCf::key(slot, pix); let erasure_key = ErasureCf::key(slot, pix);
let rblob = &blob.read().unwrap(); let rblob = &blob.read().unwrap();
let size = rblob.size()?; let size = rblob.size()?;
{ db_ledger.erasure_cf.put(
let w_db = db_ledger.write().unwrap(); &db_ledger.db,
w_db.erasure_cf.put( &erasure_key,
&w_db.db, &rblob.data[..BLOB_HEADER_SIZE + size],
&erasure_key, )?;
&rblob.data[..BLOB_HEADER_SIZE + size],
)?;
}
vec![] vec![]
} else { } else {
let data_key = DataCf::key(slot, pix); let data_key = DataCf::key(slot, pix);
db_ledger db_ledger.insert_data_blob(&data_key, &blob.read().unwrap())?
.write()
.unwrap()
.insert_data_blob(&data_key, &blob.read().unwrap())?
}; };
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
@ -341,12 +335,10 @@ pub fn process_blob(
// we only want up to a certain index // we only want up to a certain index
// then stop // then stop
if max_ix != 0 && !consumed_entries.is_empty() { if max_ix != 0 && !consumed_entries.is_empty() {
let meta = { let meta = db_ledger
let r_db = db_ledger.read().unwrap(); .meta_cf
r_db.meta_cf .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?
.get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? .expect("Expect metadata to exist if consumed entries is nonzero");
.expect("Expect metadata to exist if consumed entries is nonzero")
};
let consumed = meta.consumed; let consumed = meta.consumed;
@ -385,12 +377,10 @@ pub fn calculate_max_repair_entry_height(
} }
#[cfg(feature = "erasure")] #[cfg(feature = "erasure")]
fn try_erasure(db_ledger: &Arc<RwLock<DbLedger>>, consume_queue: &mut Vec<Entry>) -> Result<()> { fn try_erasure(db_ledger: &Arc<DbLedger>, consume_queue: &mut Vec<Entry>) -> Result<()> {
let meta = { let meta = db_ledger
let r_db = db_ledger.read().unwrap(); .meta_cf
r_db.meta_cf .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?;
.get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?
};
if let Some(meta) = meta { if let Some(meta) = meta {
let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?;
@ -401,12 +391,14 @@ fn try_erasure(db_ledger: &Arc<RwLock<DbLedger>>, consume_queue: &mut Vec<Entry>
cl.index().expect("Recovered blob must set index"), cl.index().expect("Recovered blob must set index"),
); );
let size = cl.size().expect("Recovered blob must set size"); let size = cl.size().expect("Recovered blob must set size");
let r_db = db_ledger.read().unwrap(); db_ledger.erasure_cf.put(
r_db.erasure_cf &db_ledger.db,
.put(&r_db.db, &erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; &erasure_key,
&cl.data[..BLOB_HEADER_SIZE + size],
)?;
} }
let entries = db_ledger.write().unwrap().write_shared_blobs(data)?; let entries = db_ledger.write_shared_blobs(data)?;
consume_queue.extend(entries); consume_queue.extend(entries);
} }
@ -549,7 +541,7 @@ mod test {
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_sanity"); let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes_sanity");
let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Early exit conditions // Early exit conditions
let empty: Vec<u64> = vec![]; let empty: Vec<u64> = vec![];
@ -597,7 +589,7 @@ mod test {
let slot = DEFAULT_SLOT_HEIGHT; let slot = DEFAULT_SLOT_HEIGHT;
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes");
let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries // Write entries
let gap = 10; let gap = 10;
@ -687,7 +679,7 @@ mod test {
let slot = DEFAULT_SLOT_HEIGHT; let slot = DEFAULT_SLOT_HEIGHT;
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes"); let db_ledger_path = get_tmp_ledger_path("test_find_missing_data_indexes");
let mut db_ledger = DbLedger::open(&db_ledger_path).unwrap(); let db_ledger = DbLedger::open(&db_ledger_path).unwrap();
// Write entries // Write entries
let num_entries = 10; let num_entries = 10;
@ -741,11 +733,7 @@ mod test {
// Generate the db_ledger from the window // Generate the db_ledger from the window
let ledger_path = get_tmp_ledger_path("test_try_erasure"); let ledger_path = get_tmp_ledger_path("test_try_erasure");
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, false));
&ledger_path,
&window,
false,
)));
let mut consume_queue = vec![]; let mut consume_queue = vec![];
try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt"); try_erasure(&db_ledger, &mut consume_queue).expect("Expected successful erasure attempt");
@ -759,11 +747,10 @@ mod test {
assert_eq!(consume_queue, expected); assert_eq!(consume_queue, expected);
let erased_coding_l = erased_coding.read().unwrap(); let erased_coding_l = erased_coding.read().unwrap();
let r_db = db_ledger.read().unwrap();
assert_eq!( assert_eq!(
&r_db &db_ledger
.erasure_cf .erasure_cf
.get_by_slot_index(&r_db.db, slot_height, erase_offset as u64) .get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64)
.unwrap() .unwrap()
.unwrap()[BLOB_HEADER_SIZE..], .unwrap()[BLOB_HEADER_SIZE..],
&erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize],
@ -778,7 +765,7 @@ mod test {
// Create RocksDb ledger // Create RocksDb ledger
let db_ledger_path = get_tmp_ledger_path("test_process_blob"); let db_ledger_path = get_tmp_ledger_path("test_process_blob");
let db_ledger = Arc::new(RwLock::new(DbLedger::open(&db_ledger_path).unwrap())); let db_ledger = Arc::new(DbLedger::open(&db_ledger_path).unwrap());
// Mock the tick height to look like the tick height right after a leader transition // Mock the tick height to look like the tick height right after a leader transition
leader_scheduler.last_seed_height = None; leader_scheduler.last_seed_height = None;

View File

@ -351,7 +351,7 @@ pub fn generate_coding(
// Recover the missing data and coding blobs from the input ledger. Returns a vector // Recover the missing data and coding blobs from the input ledger. Returns a vector
// of the recovered missing data blobs and a vector of the recovered coding blobs // of the recovered missing data blobs and a vector of the recovered coding blobs
pub fn recover( pub fn recover(
db_ledger: &Arc<RwLock<DbLedger>>, db_ledger: &Arc<DbLedger>,
slot: u64, slot: u64,
start_idx: u64, start_idx: u64,
) -> Result<(Vec<SharedBlob>, Vec<SharedBlob>)> { ) -> Result<(Vec<SharedBlob>, Vec<SharedBlob>)> {
@ -367,17 +367,11 @@ pub fn recover(
block_end_idx block_end_idx
); );
let data_missing = find_missing_data_indexes( let data_missing =
slot, find_missing_data_indexes(slot, &db_ledger, block_start_idx, block_end_idx, NUM_DATA).len();
&db_ledger.read().unwrap(),
block_start_idx,
block_end_idx,
NUM_DATA,
)
.len();
let coding_missing = find_missing_coding_indexes( let coding_missing = find_missing_coding_indexes(
slot, slot,
&db_ledger.read().unwrap(), &db_ledger,
coding_start_idx, coding_start_idx,
block_end_idx, block_end_idx,
NUM_CODING, NUM_CODING,
@ -416,10 +410,9 @@ pub fn recover(
// Add the data blobs we have into the recovery vector, mark the missing ones // Add the data blobs we have into the recovery vector, mark the missing ones
for i in block_start_idx..block_end_idx { for i in block_start_idx..block_end_idx {
let result = { let result = db_ledger
let r_db = db_ledger.read().unwrap(); .data_cf
r_db.data_cf.get_by_slot_index(&r_db.db, slot, i)? .get_by_slot_index(&db_ledger.db, slot, i)?;
};
categorize_blob( categorize_blob(
&result, &result,
@ -432,10 +425,9 @@ pub fn recover(
// Add the coding blobs we have into the recovery vector, mark the missing ones // Add the coding blobs we have into the recovery vector, mark the missing ones
for i in coding_start_idx..block_end_idx { for i in coding_start_idx..block_end_idx {
let result = { let result = db_ledger
let r_db = db_ledger.read().unwrap(); .erasure_cf
r_db.erasure_cf.get_by_slot_index(&r_db.db, slot, i)? .get_by_slot_index(&db_ledger.db, slot, i)?;
};
categorize_blob( categorize_blob(
&result, &result,
@ -528,10 +520,9 @@ pub fn recover(
// Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct
// the blobs again // the blobs again
for i in coding_start_idx..block_end_idx { for i in coding_start_idx..block_end_idx {
{ db_ledger
let r_db = db_ledger.read().unwrap(); .erasure_cf
r_db.erasure_cf.delete_by_slot_index(&r_db.db, slot, i)?; .delete_by_slot_index(&db_ledger.db, slot, i)?;
}
} }
return Ok((vec![], vec![])); return Ok((vec![], vec![]));
} }
@ -576,7 +567,7 @@ pub mod test {
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::{Arc, RwLock}; use std::sync::Arc;
#[test] #[test]
pub fn test_coding() { pub fn test_coding() {
@ -636,7 +627,7 @@ pub mod test {
window: &[WindowSlot], window: &[WindowSlot],
use_random: bool, use_random: bool,
) -> DbLedger { ) -> DbLedger {
let mut db_ledger = let db_ledger =
DbLedger::open(ledger_path).expect("Expected to be able to open database ledger"); DbLedger::open(ledger_path).expect("Expected to be able to open database ledger");
for slot in window { for slot in window {
if let Some(ref data) = slot.data { if let Some(ref data) = slot.data {
@ -842,11 +833,7 @@ pub mod test {
// Generate the db_ledger from the window // Generate the db_ledger from the window
let ledger_path = get_tmp_ledger_path("test_window_recover_basic"); let ledger_path = get_tmp_ledger_path("test_window_recover_basic");
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, true));
&ledger_path,
&window,
true,
)));
// Recover it from coding // Recover it from coding
let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64) let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64)
@ -896,11 +883,7 @@ pub mod test {
window[erase_offset].data = None; window[erase_offset].data = None;
window[erase_offset].coding = None; window[erase_offset].coding = None;
let ledger_path = get_tmp_ledger_path("test_window_recover_basic2"); let ledger_path = get_tmp_ledger_path("test_window_recover_basic2");
let db_ledger = Arc::new(RwLock::new(generate_db_ledger_from_window( let db_ledger = Arc::new(generate_db_ledger_from_window(&ledger_path, &window, true));
&ledger_path,
&window,
true,
)));
// Recover it from coding // Recover it from coding
let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64) let (recovered_data, recovered_coding) = recover(&db_ledger, 0, offset as u64)

View File

@ -108,7 +108,7 @@ pub struct Fullnode {
broadcast_socket: UdpSocket, broadcast_socket: UdpSocket,
rpc_addr: SocketAddr, rpc_addr: SocketAddr,
rpc_pubsub_addr: SocketAddr, rpc_pubsub_addr: SocketAddr,
db_ledger: Arc<RwLock<DbLedger>>, db_ledger: Arc<DbLedger>,
} }
impl Fullnode { impl Fullnode {
@ -587,7 +587,7 @@ impl Fullnode {
) )
} }
fn make_db_ledger(ledger_path: &str) -> Arc<RwLock<DbLedger>> { fn make_db_ledger(ledger_path: &str) -> Arc<DbLedger> {
// Destroy any existing instances of the RocksDb ledger // Destroy any existing instances of the RocksDb ledger
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
let ledger_entries = read_ledger(ledger_path, true) let ledger_entries = read_ledger(ledger_path, true)
@ -597,7 +597,7 @@ impl Fullnode {
write_entries_to_ledger(&[ledger_path], ledger_entries, DEFAULT_SLOT_HEIGHT); write_entries_to_ledger(&[ledger_path], ledger_entries, DEFAULT_SLOT_HEIGHT);
let db = let db =
DbLedger::open(ledger_path).expect("Expected to successfully open database ledger"); DbLedger::open(ledger_path).expect("Expected to successfully open database ledger");
Arc::new(RwLock::new(db)) Arc::new(db)
} }
} }

View File

@ -18,7 +18,7 @@ pub struct GossipService {
impl GossipService { impl GossipService {
pub fn new( pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
db_ledger: Option<Arc<RwLock<DbLedger>>>, db_ledger: Option<Arc<DbLedger>>,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {

View File

@ -113,10 +113,10 @@ impl Replicator {
// RocksDb. Note for now, this ledger will not contain any of the existing entries // RocksDb. Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received // in the ledger located at ledger_path, and will only append on newly received
// entries after being passed to window_service // entries after being passed to window_service
let db_ledger = Arc::new(RwLock::new( let db_ledger = Arc::new(
DbLedger::open(&ledger_path.unwrap()) DbLedger::open(&ledger_path.unwrap())
.expect("Expected to be able to open database ledger"), .expect("Expected to be able to open database ledger"),
)); );
let gossip_service = GossipService::new( let gossip_service = GossipService::new(
&cluster_info, &cluster_info,

View File

@ -83,7 +83,7 @@ pub struct RetransmitStage {
impl RetransmitStage { impl RetransmitStage {
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
db_ledger: Arc<RwLock<DbLedger>>, db_ledger: Arc<DbLedger>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
tick_height: u64, tick_height: u64,
entry_height: u64, entry_height: u64,

View File

@ -70,7 +70,7 @@ impl Tvu {
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<RwLock<ClusterInfo>>,
sockets: Sockets, sockets: Sockets,
ledger_path: Option<&str>, ledger_path: Option<&str>,
db_ledger: Arc<RwLock<DbLedger>>, db_ledger: Arc<DbLedger>,
) -> Self { ) -> Self {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info
@ -294,7 +294,7 @@ pub mod tests {
} }
}, },
None, None,
Arc::new(RwLock::new(db_ledger)), Arc::new(db_ledger),
); );
let mut alice_ref_balance = starting_balance; let mut alice_ref_balance = starting_balance;

View File

@ -14,7 +14,6 @@ use rand::{thread_rng, Rng};
use solana_metrics::{influxdb, submit}; use solana_metrics::{influxdb, submit};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use std::borrow::Borrow;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
@ -52,7 +51,7 @@ fn repair_backoff(last: &mut u64, times: &mut usize, consumed: u64) -> bool {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn recv_window( fn recv_window(
db_ledger: &Arc<RwLock<DbLedger>>, db_ledger: &Arc<DbLedger>,
id: &Pubkey, id: &Pubkey,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>, leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
tick_height: &mut u64, tick_height: &mut u64,
@ -123,7 +122,7 @@ fn recv_window(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn window_service( pub fn window_service(
db_ledger: Arc<RwLock<DbLedger>>, db_ledger: Arc<DbLedger>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
tick_height: u64, tick_height: u64,
entry_height: u64, entry_height: u64,
@ -165,13 +164,9 @@ pub fn window_service(
} }
} }
let meta = { let meta = db_ledger
let rlock = db_ledger.read().unwrap(); .meta_cf
.get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT));
rlock
.meta_cf
.get(&rlock.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
};
if let Ok(Some(meta)) = meta { if let Ok(Some(meta)) = meta {
let received = meta.received; let received = meta.received;
@ -203,7 +198,7 @@ pub fn window_service(
trace!("{} let's repair! times = {}", id, times); trace!("{} let's repair! times = {}", id, times);
let reqs = repair( let reqs = repair(
db_ledger.read().unwrap().borrow(), &db_ledger,
&cluster_info, &cluster_info,
&id, &id,
times, times,
@ -277,9 +272,9 @@ mod test {
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false)); let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_test"); let db_ledger_path = get_tmp_ledger_path("window_send_test");
let db_ledger = Arc::new(RwLock::new( let db_ledger = Arc::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
)); );
let t_window = window_service( let t_window = window_service(
db_ledger, db_ledger,
subs, subs,
@ -347,9 +342,9 @@ mod test {
let (s_retransmit, r_retransmit) = channel(); let (s_retransmit, r_retransmit) = channel();
let done = Arc::new(AtomicBool::new(false)); let done = Arc::new(AtomicBool::new(false));
let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test"); let db_ledger_path = get_tmp_ledger_path("window_send_late_leader_test");
let db_ledger = Arc::new(RwLock::new( let db_ledger = Arc::new(
DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"), DbLedger::open(&db_ledger_path).expect("Expected to be able to open database ledger"),
)); );
let t_window = window_service( let t_window = window_service(
db_ledger, db_ledger,
subs.clone(), subs.clone(),