diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index b2d1c56a87..32d36a8eed 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -16,7 +16,6 @@ use solana::logger; use solana::service::Service; use solana::signature::GenKeys; use solana::thin_client::{poll_gossip_for_leader, ThinClient}; -use solana::window::default_window; use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT}; use solana_metrics::influxdb; use solana_sdk::hash::Hash; @@ -850,9 +849,7 @@ fn converge( spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.set_leader(leader.id); let spy_ref = Arc::new(RwLock::new(spy_cluster_info)); - let window = Arc::new(RwLock::new(default_window())); - let gossip_service = - GossipService::new(&spy_ref, window, None, gossip_socket, exit_signal.clone()); + let gossip_service = GossipService::new(&spy_ref, None, gossip_socket, exit_signal.clone()); let mut v: Vec = vec![]; // wait for the network to converge, 30 seconds should be plenty for _ in 0..30 { diff --git a/src/cluster_info.rs b/src/cluster_info.rs index b65e3a3fa6..d7060b6838 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -19,6 +19,7 @@ use crate::crds_gossip::CrdsGossip; use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId}; +use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT}; use crate::ledger::LedgerWindow; use crate::netutil::{bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range}; use crate::packet::{to_blob, Blob, SharedBlob, BLOB_SIZE}; @@ -673,79 +674,43 @@ impl ClusterInfo { fn run_window_request( from: &NodeInfo, from_addr: &SocketAddr, - window: &SharedWindow, - ledger_window: &mut Option<&mut LedgerWindow>, + db_ledger: Option<&Arc>>, me: &NodeInfo, leader_id: Pubkey, ix: u64, ) -> Vec { - let pos = (ix as usize) % window.read().unwrap().len(); - if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data { - let mut wblob = blob.write().unwrap(); - let blob_ix = wblob.index().expect("run_window_request index"); - if blob_ix == ix { - let num_retransmits = wblob.meta.num_retransmits; - wblob.meta.num_retransmits += 1; - // Setting the sender id to the requester id - // prevents the requester from retransmitting this response - // to other peers - let mut sender_id = from.id; + if let Some(db_ledger) = db_ledger { + let meta = { + let r_db = db_ledger.read().unwrap(); - // Allow retransmission of this response if the node - // is the leader and the number of repair requests equals - // a power of two - if leader_id == me.id && (num_retransmits == 0 || num_retransmits.is_power_of_two()) - { - sender_id = me.id + r_db.meta_cf + .get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + }; + + if let Ok(Some(meta)) = meta { + let max_slot = meta.received_slot; + // Try to find the requested index in one of the slots + for i in 0..=max_slot { + let get_result = { + let r_db = db_ledger.read().unwrap(); + r_db.data_cf.get_by_slot_index(&r_db.db, i, ix) + }; + + if let Ok(Some(blob_data)) = get_result { + inc_new_counter_info!("cluster_info-window-request-ledger", 1); + let mut blob = Blob::new(&blob_data); + blob.set_index(ix).expect("set_index()"); + blob.set_id(&me.id).expect("set_id()"); // causes retransmission if I'm the leader + blob.meta.set_addr(from_addr); + + return vec![Arc::new(RwLock::new(blob))]; + } } - - let out = SharedBlob::default(); - - // copy to avoid doing IO inside the lock - { - let mut outblob = out.write().unwrap(); - let sz = wblob.meta.size; - outblob.meta.size = sz; - outblob.data[..sz].copy_from_slice(&wblob.data[..sz]); - outblob.meta.set_addr(from_addr); - outblob.set_id(&sender_id).expect("blob set_id"); - } - inc_new_counter_info!("cluster_info-window-request-pass", 1); - - return vec![out]; - } else { - inc_new_counter_info!("cluster_info-window-request-outside", 1); - trace!( - "requested ix {} != blob_ix {}, outside window!", - ix, - blob_ix - ); - // falls through to checking window_ledger - } - } - - if let Some(ledger_window) = ledger_window { - if let Ok(entry) = ledger_window.get_entry(ix) { - inc_new_counter_info!("cluster_info-window-request-ledger", 1); - - let out = entry.to_blob( - Some(ix), - Some(me.id), // causes retransmission if I'm the leader - Some(from_addr), - ); - - return vec![out]; } } inc_new_counter_info!("cluster_info-window-request-fail", 1); - trace!( - "{}: failed RequestWindowIndex {} {} {}", - me.id, - from.id, - ix, - pos, - ); + trace!("{}: failed RequestWindowIndex {} {}", me.id, from.id, ix,); vec![] } @@ -753,17 +718,17 @@ impl ClusterInfo { //TODO we should first coalesce all the requests fn handle_blob( obj: &Arc>, - window: &SharedWindow, - ledger_window: &mut Option<&mut LedgerWindow>, + db_ledger: Option<&Arc>>, blob: &Blob, ) -> Vec { deserialize(&blob.data[..blob.meta.size]) .into_iter() .flat_map(|request| { - ClusterInfo::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window) + ClusterInfo::handle_protocol(obj, &blob.meta.addr(), db_ledger, request) }) .collect() } + fn handle_pull_request( me: &Arc>, filter: Bloom, @@ -867,10 +832,9 @@ impl ClusterInfo { fn handle_request_window_index( me: &Arc>, from: &ContactInfo, + db_ledger: Option<&Arc>>, ix: u64, from_addr: &SocketAddr, - window: &SharedWindow, - ledger_window: &mut Option<&mut LedgerWindow>, ) -> Vec { let now = Instant::now(); @@ -898,15 +862,7 @@ impl ClusterInfo { from.id, ix, ); - let res = Self::run_window_request( - &from, - &from_addr, - &window, - ledger_window, - &my_info, - leader_id, - ix, - ); + let res = Self::run_window_request(&from, &from_addr, db_ledger, &my_info, leader_id, ix); report_time_spent( "RequestWindowIndex", &now.elapsed(), @@ -917,9 +873,8 @@ impl ClusterInfo { fn handle_protocol( me: &Arc>, from_addr: &SocketAddr, + db_ledger: Option<&Arc>>, request: Protocol, - window: &SharedWindow, - ledger_window: &mut Option<&mut LedgerWindow>, ) -> Vec { match request { // TODO verify messages faster @@ -960,7 +915,7 @@ impl ClusterInfo { vec![] } Protocol::RequestWindowIndex(from, ix) => { - Self::handle_request_window_index(me, &from, ix, from_addr, window, ledger_window) + Self::handle_request_window_index(me, &from, db_ledger, ix, from_addr) } } } @@ -968,8 +923,7 @@ impl ClusterInfo { /// Process messages from the network fn run_listen( obj: &Arc>, - window: &SharedWindow, - ledger_window: &mut Option<&mut LedgerWindow>, + db_ledger: Option<&Arc>>, requests_receiver: &BlobReceiver, response_sender: &BlobSender, ) -> Result<()> { @@ -981,7 +935,7 @@ impl ClusterInfo { } let mut resps = Vec::new(); for req in reqs { - let mut resp = Self::handle_blob(obj, window, ledger_window, &req.read().unwrap()); + let mut resp = Self::handle_blob(obj, db_ledger, &req.read().unwrap()); resps.append(&mut resp); } response_sender.send(resps)?; @@ -989,21 +943,17 @@ impl ClusterInfo { } pub fn listen( me: Arc>, - window: SharedWindow, - ledger_path: Option<&str>, + db_ledger: Option>>, requests_receiver: BlobReceiver, response_sender: BlobSender, exit: Arc, ) -> JoinHandle<()> { - let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap()); - Builder::new() .name("solana-listen".to_string()) .spawn(move || loop { let e = Self::run_listen( &me, - &window, - &mut ledger_window.as_mut(), + db_ledger.as_ref(), &requests_receiver, &response_sender, ); @@ -1160,12 +1110,12 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { mod tests { use super::*; use crate::crds_value::CrdsValueLabel; + use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::entry::Entry; use crate::ledger::{get_tmp_ledger_path, LedgerWindow, LedgerWriter}; use crate::logger; - use crate::packet::SharedBlob; use crate::result::Error; - use crate::window::default_window; + use packet::{Blob, BLOB_HEADER_SIZE}; use solana_sdk::hash::{hash, Hash}; use solana_sdk::signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; @@ -1263,140 +1213,62 @@ mod tests { #[test] fn run_window_request() { logger::setup(); - let window = Arc::new(RwLock::new(default_window())); - let me = NodeInfo::new( - Keypair::new().pubkey(), - socketaddr!("127.0.0.1:1234"), - socketaddr!("127.0.0.1:1235"), - socketaddr!("127.0.0.1:1236"), - socketaddr!("127.0.0.1:1237"), - socketaddr!("127.0.0.1:1238"), - socketaddr!("127.0.0.1:1239"), - 0, - ); - let leader_id = me.id; - let rv = ClusterInfo::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut None, - &me, - leader_id, - 0, - ); - assert!(rv.is_empty()); - let out = SharedBlob::default(); - out.write().unwrap().meta.size = 200; - window.write().unwrap()[0].data = Some(out); - let rv = ClusterInfo::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut None, - &me, - leader_id, - 0, - ); - assert!(!rv.is_empty()); - let v = rv[0].clone(); - //test we copied the blob - assert_eq!(v.read().unwrap().meta.size, 200); - let len = window.read().unwrap().len() as u64; - let rv = ClusterInfo::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut None, - &me, - leader_id, - len, - ); - assert!(rv.is_empty()); - - fn tmp_ledger(name: &str) -> String { - let path = get_tmp_ledger_path(name); - - let mut writer = LedgerWriter::open(&path, true).unwrap(); - let zero = Hash::default(); - let one = hash(&zero.as_ref()); - writer - .write_entries( - &vec![ - Entry::new_tick(&zero, 0, 0, &zero), - Entry::new_tick(&one, 1, 0, &one), - ] - .to_vec(), - ) - .unwrap(); - path - } - - let ledger_path = tmp_ledger("run_window_request"); - let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap(); - - let rv = ClusterInfo::run_window_request( - &me, - &socketaddr_any!(), - &window, - &mut Some(&mut ledger_window), - &me, - leader_id, - 1, - ); - assert!(!rv.is_empty()); - - remove_dir_all(ledger_path).unwrap(); - } - - /// test window requests respond with the right blob, and do not overrun - #[test] - fn run_window_request_with_backoff() { - let window = Arc::new(RwLock::new(default_window())); - - let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let leader_id = me.id; - - let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - - // Simulate handling a repair request from mock_peer - let rv = ClusterInfo::run_window_request( - &mock_peer, - &socketaddr_any!(), - &window, - &mut None, - &me, - leader_id, - 0, - ); - assert!(rv.is_empty()); - let blob = SharedBlob::default(); - let blob_size = 200; - blob.write().unwrap().meta.size = blob_size; - window.write().unwrap()[0].data = Some(blob); - - let num_requests: u32 = 64; - for i in 0..num_requests { - let shared_blob = ClusterInfo::run_window_request( - &mock_peer, + let ledger_path = get_tmp_ledger_path("run_window_request"); + { + let db_ledger = Arc::new(RwLock::new(DbLedger::open(&ledger_path).unwrap())); + let me = NodeInfo::new( + Keypair::new().pubkey(), + socketaddr!("127.0.0.1:1234"), + socketaddr!("127.0.0.1:1235"), + socketaddr!("127.0.0.1:1236"), + socketaddr!("127.0.0.1:1237"), + socketaddr!("127.0.0.1:1238"), + socketaddr!("127.0.0.1:1239"), + 0, + ); + let leader_id = me.id; + let rv = ClusterInfo::run_window_request( + &me, &socketaddr_any!(), - &window, - &mut None, + Some(&db_ledger), &me, leader_id, 0, - )[0] - .clone(); - let blob = shared_blob.read().unwrap(); - // Test we copied the blob - assert_eq!(blob.meta.size, blob_size); + ); + assert!(rv.is_empty()); + let data_size = 1; + let blob = SharedBlob::default(); + { + let mut w_blob = blob.write().unwrap(); + w_blob.set_size(data_size); + w_blob.set_index(1).expect("set_index()"); + w_blob.set_slot(2).expect("set_slot()"); + w_blob.meta.size = data_size + BLOB_HEADER_SIZE; + } - let id = if i == 0 || i.is_power_of_two() { - me.id - } else { - mock_peer.id - }; - assert_eq!(blob.id().unwrap(), id); + { + let mut w_ledger = db_ledger.write().unwrap(); + w_ledger + .write_shared_blobs(2, vec![&blob]) + .expect("Expect successful ledger write"); + } + + let rv = ClusterInfo::run_window_request( + &me, + &socketaddr_any!(), + Some(&db_ledger), + &me, + leader_id, + 1, + ); + assert!(!rv.is_empty()); + let v = rv[0].clone(); + assert_eq!(v.read().unwrap().index().unwrap(), 1); + assert_eq!(v.read().unwrap().slot().unwrap(), 2); + assert_eq!(v.read().unwrap().meta.size, BLOB_HEADER_SIZE + data_size); } + + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } #[test] diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 6b5f42ddfb..23a05e0efb 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -793,6 +793,50 @@ mod tests { DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); } + #[test] + fn test_insert_data_blobs_slots() { + let num_blobs = 10; + let entries = make_tiny_test_entries(num_blobs); + let shared_blobs = entries.to_blobs(); + let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); + let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); + + let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_slots"); + let ledger = DbLedger::open(&ledger_path).unwrap(); + + // Insert last blob into next slot + let result = ledger + .insert_data_blob( + &DataCf::key(DEFAULT_SLOT_HEIGHT + 1, (num_blobs - 1) as u64), + blobs.last().unwrap(), + ) + .unwrap(); + assert_eq!(result.len(), 0); + + // Insert blobs into first slot, check for consecutive blobs + for i in (0..num_blobs - 1).rev() { + let result = ledger + .insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i]) + .unwrap(); + let meta = ledger + .meta_cf + .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .unwrap() + .expect("Expected metadata object to exist"); + if i != 0 { + assert_eq!(result.len(), 0); + assert!(meta.consumed == 0 && meta.received == num_blobs as u64); + } else { + assert_eq!(result, entries); + assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64); + } + } + + // Destroying database without closing it first is undefined behavior + drop(ledger); + DbLedger::destroy(&ledger_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_iteration_order() { let slot = 0; diff --git a/src/fullnode.rs b/src/fullnode.rs index f6a2ea0b5f..872949cc1e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -216,6 +216,9 @@ impl Fullnode { sigverify_disabled: bool, rpc_port: Option, ) -> Self { + // Create the RocksDb ledger + let db_ledger = Self::make_db_ledger(ledger_path); + let mut rpc_addr = node.info.rpc; let mut rpc_pubsub_addr = node.info.rpc_pubsub; // Use custom RPC port, if provided (`Some(port)`) @@ -244,8 +247,7 @@ impl Fullnode { let gossip_service = GossipService::new( &cluster_info, - shared_window.clone(), - Some(ledger_path), + Some(db_ledger.clone()), node.sockets.gossip, exit.clone(), ); @@ -266,9 +268,6 @@ impl Fullnode { cluster_info.write().unwrap().set_leader(scheduled_leader); - // Create the RocksDb ledger - let db_ledger = Self::make_db_ledger(ledger_path); - let node_role = if scheduled_leader != keypair.pubkey() { // Start in validator mode. let sockets = Sockets { diff --git a/src/gossip_service.rs b/src/gossip_service.rs index a8d48c4899..bde0cf04cf 100644 --- a/src/gossip_service.rs +++ b/src/gossip_service.rs @@ -1,9 +1,9 @@ //! The `gossip_service` module implements the network control plane. use crate::cluster_info::ClusterInfo; +use crate::db_ledger::DbLedger; use crate::service::Service; use crate::streamer; -use crate::window::SharedWindow; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; @@ -18,8 +18,7 @@ pub struct GossipService { impl GossipService { pub fn new( cluster_info: &Arc>, - window: SharedWindow, - ledger_path: Option<&str>, + db_ledger: Option>>, gossip_socket: UdpSocket, exit: Arc, ) -> Self { @@ -36,8 +35,7 @@ impl GossipService { let t_responder = streamer::responder("gossip", gossip_socket, response_receiver); let t_listen = ClusterInfo::listen( cluster_info.clone(), - window, - ledger_path, + db_ledger, request_receiver, response_sender.clone(), exit.clone(), @@ -79,8 +77,7 @@ mod tests { let tn = Node::new_localhost(); let cluster_info = ClusterInfo::new(tn.info.clone()); let c = Arc::new(RwLock::new(cluster_info)); - let w = Arc::new(RwLock::new(vec![])); - let d = GossipService::new(&c, w, None, tn.sockets.gossip, exit.clone()); + let d = GossipService::new(&c, None, tn.sockets.gossip, exit.clone()); d.close().expect("thread join"); } } diff --git a/src/packet.rs b/src/packet.rs index da49f42c22..ea8ac79f51 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -363,7 +363,9 @@ impl Blob { } pub fn size(&self) -> Result { let size = self.data_size()? as usize; - if self.meta.size == size { + if size <= BLOB_HEADER_SIZE { + Err(Error::BlobError(BlobError::BadState)) + } else if self.meta.size == size { Ok(size - BLOB_HEADER_SIZE) } else { Err(Error::BlobError(BlobError::BadState)) diff --git a/src/replicator.rs b/src/replicator.rs index 4252f9f099..cd291d0d8b 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -99,7 +99,6 @@ impl Replicator { const REPLICATOR_WINDOW_SIZE: usize = 32 * 1024; let window = window::new_window(REPLICATOR_WINDOW_SIZE); - let shared_window = Arc::new(RwLock::new(window)); info!("Replicator: id: {}", keypair.pubkey()); info!("Creating cluster info...."); @@ -127,8 +126,7 @@ impl Replicator { let gossip_service = GossipService::new( &cluster_info, - shared_window.clone(), - ledger_path, + Some(db_ledger.clone()), node.sockets.gossip, exit.clone(), ); diff --git a/src/thin_client.rs b/src/thin_client.rs index 669f215a94..0395c27c1a 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -349,14 +349,8 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R let (node, gossip_socket) = ClusterInfo::spy_node(); let my_addr = gossip_socket.local_addr().unwrap(); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node))); - let window = Arc::new(RwLock::new(vec![])); - let gossip_service = GossipService::new( - &cluster_info.clone(), - window, - None, - gossip_socket, - exit.clone(), - ); + let gossip_service = + GossipService::new(&cluster_info.clone(), None, gossip_socket, exit.clone()); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); cluster_info diff --git a/src/tvu.rs b/src/tvu.rs index f52cd4e6e9..05a8a324a5 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -211,7 +211,7 @@ pub mod tests { exit: Arc, ) -> (GossipService, SharedWindow) { let window = Arc::new(RwLock::new(window::default_window())); - let gossip_service = GossipService::new(&cluster_info, window.clone(), None, gossip, exit); + let gossip_service = GossipService::new(&cluster_info, None, gossip, exit); (gossip_service, window) } diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 81d48b5c58..6ba10820f9 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -21,8 +21,7 @@ fn test_node(exit: Arc) -> (Arc>, GossipService, let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair)); let c = Arc::new(RwLock::new(cluster_info)); - let w = Arc::new(RwLock::new(vec![])); - let d = GossipService::new(&c.clone(), w, None, tn.sockets.gossip, exit); + let d = GossipService::new(&c.clone(), None, tn.sockets.gossip, exit); let _ = c.read().unwrap().my_data(); (c, d, tn.sockets.tvu.pop().unwrap()) } diff --git a/tests/multinode.rs b/tests/multinode.rs index 5557a5ef62..f8acd99667 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -49,10 +49,8 @@ fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc>, spy_cluster_info.insert_info(leader.clone()); spy_cluster_info.set_leader(leader.id); let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info)); - let spy_window = Arc::new(RwLock::new(default_window())); let gossip_service = GossipService::new( &spy_cluster_info_ref, - spy_window, None, spy.sockets.gossip, exit.clone(), @@ -73,10 +71,8 @@ fn make_listening_node( new_node_cluster_info.insert_info(leader.clone()); new_node_cluster_info.set_leader(leader.id); let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info)); - let new_node_window = Arc::new(RwLock::new(default_window())); let gossip_service = GossipService::new( &new_node_cluster_info_ref, - new_node_window, None, new_node .sockets