Make a dummy version of serving repairs from db_ledger

This commit is contained in:
Carl 2018-12-10 01:24:41 -08:00 committed by Grimes
parent b1b190b80d
commit 245362db96
11 changed files with 152 additions and 254 deletions

View File

@ -16,7 +16,6 @@ use solana::logger;
use solana::service::Service;
use solana::signature::GenKeys;
use solana::thin_client::{poll_gossip_for_leader, ThinClient};
use solana::window::default_window;
use solana_drone::drone::{request_airdrop_transaction, DRONE_PORT};
use solana_metrics::influxdb;
use solana_sdk::hash::Hash;
@ -850,9 +849,7 @@ fn converge(
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
let window = Arc::new(RwLock::new(default_window()));
let gossip_service =
GossipService::new(&spy_ref, window, None, gossip_socket, exit_signal.clone());
let gossip_service = GossipService::new(&spy_ref, None, gossip_socket, exit_signal.clone());
let mut v: Vec<NodeInfo> = vec![];
// wait for the network to converge, 30 seconds should be plenty
for _ in 0..30 {

View File

@ -19,6 +19,7 @@ use crate::crds_gossip::CrdsGossip;
use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::crds_value::{CrdsValue, CrdsValueLabel, LeaderId};
use crate::db_ledger::{DbLedger, LedgerColumnFamily, MetaCf, DEFAULT_SLOT_HEIGHT};
use crate::ledger::LedgerWindow;
use crate::netutil::{bind_in_range, bind_to, find_available_port_in_range, multi_bind_in_range};
use crate::packet::{to_blob, Blob, SharedBlob, BLOB_SIZE};
@ -673,79 +674,43 @@ impl ClusterInfo {
fn run_window_request(
from: &NodeInfo,
from_addr: &SocketAddr,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
db_ledger: Option<&Arc<RwLock<DbLedger>>>,
me: &NodeInfo,
leader_id: Pubkey,
ix: u64,
) -> Vec<SharedBlob> {
let pos = (ix as usize) % window.read().unwrap().len();
if let Some(ref mut blob) = &mut window.write().unwrap()[pos].data {
let mut wblob = blob.write().unwrap();
let blob_ix = wblob.index().expect("run_window_request index");
if blob_ix == ix {
let num_retransmits = wblob.meta.num_retransmits;
wblob.meta.num_retransmits += 1;
// Setting the sender id to the requester id
// prevents the requester from retransmitting this response
// to other peers
let mut sender_id = from.id;
if let Some(db_ledger) = db_ledger {
let meta = {
let r_db = db_ledger.read().unwrap();
// Allow retransmission of this response if the node
// is the leader and the number of repair requests equals
// a power of two
if leader_id == me.id && (num_retransmits == 0 || num_retransmits.is_power_of_two())
{
sender_id = me.id
r_db.meta_cf
.get(&r_db.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
};
if let Ok(Some(meta)) = meta {
let max_slot = meta.received_slot;
// Try to find the requested index in one of the slots
for i in 0..=max_slot {
let get_result = {
let r_db = db_ledger.read().unwrap();
r_db.data_cf.get_by_slot_index(&r_db.db, i, ix)
};
if let Ok(Some(blob_data)) = get_result {
inc_new_counter_info!("cluster_info-window-request-ledger", 1);
let mut blob = Blob::new(&blob_data);
blob.set_index(ix).expect("set_index()");
blob.set_id(&me.id).expect("set_id()"); // causes retransmission if I'm the leader
blob.meta.set_addr(from_addr);
return vec![Arc::new(RwLock::new(blob))];
}
}
let out = SharedBlob::default();
// copy to avoid doing IO inside the lock
{
let mut outblob = out.write().unwrap();
let sz = wblob.meta.size;
outblob.meta.size = sz;
outblob.data[..sz].copy_from_slice(&wblob.data[..sz]);
outblob.meta.set_addr(from_addr);
outblob.set_id(&sender_id).expect("blob set_id");
}
inc_new_counter_info!("cluster_info-window-request-pass", 1);
return vec![out];
} else {
inc_new_counter_info!("cluster_info-window-request-outside", 1);
trace!(
"requested ix {} != blob_ix {}, outside window!",
ix,
blob_ix
);
// falls through to checking window_ledger
}
}
if let Some(ledger_window) = ledger_window {
if let Ok(entry) = ledger_window.get_entry(ix) {
inc_new_counter_info!("cluster_info-window-request-ledger", 1);
let out = entry.to_blob(
Some(ix),
Some(me.id), // causes retransmission if I'm the leader
Some(from_addr),
);
return vec![out];
}
}
inc_new_counter_info!("cluster_info-window-request-fail", 1);
trace!(
"{}: failed RequestWindowIndex {} {} {}",
me.id,
from.id,
ix,
pos,
);
trace!("{}: failed RequestWindowIndex {} {}", me.id, from.id, ix,);
vec![]
}
@ -753,17 +718,17 @@ impl ClusterInfo {
//TODO we should first coalesce all the requests
fn handle_blob(
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
db_ledger: Option<&Arc<RwLock<DbLedger>>>,
blob: &Blob,
) -> Vec<SharedBlob> {
deserialize(&blob.data[..blob.meta.size])
.into_iter()
.flat_map(|request| {
ClusterInfo::handle_protocol(obj, &blob.meta.addr(), request, window, ledger_window)
ClusterInfo::handle_protocol(obj, &blob.meta.addr(), db_ledger, request)
})
.collect()
}
fn handle_pull_request(
me: &Arc<RwLock<Self>>,
filter: Bloom<Hash>,
@ -867,10 +832,9 @@ impl ClusterInfo {
fn handle_request_window_index(
me: &Arc<RwLock<Self>>,
from: &ContactInfo,
db_ledger: Option<&Arc<RwLock<DbLedger>>>,
ix: u64,
from_addr: &SocketAddr,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
) -> Vec<SharedBlob> {
let now = Instant::now();
@ -898,15 +862,7 @@ impl ClusterInfo {
from.id,
ix,
);
let res = Self::run_window_request(
&from,
&from_addr,
&window,
ledger_window,
&my_info,
leader_id,
ix,
);
let res = Self::run_window_request(&from, &from_addr, db_ledger, &my_info, leader_id, ix);
report_time_spent(
"RequestWindowIndex",
&now.elapsed(),
@ -917,9 +873,8 @@ impl ClusterInfo {
fn handle_protocol(
me: &Arc<RwLock<Self>>,
from_addr: &SocketAddr,
db_ledger: Option<&Arc<RwLock<DbLedger>>>,
request: Protocol,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
) -> Vec<SharedBlob> {
match request {
// TODO verify messages faster
@ -960,7 +915,7 @@ impl ClusterInfo {
vec![]
}
Protocol::RequestWindowIndex(from, ix) => {
Self::handle_request_window_index(me, &from, ix, from_addr, window, ledger_window)
Self::handle_request_window_index(me, &from, db_ledger, ix, from_addr)
}
}
}
@ -968,8 +923,7 @@ impl ClusterInfo {
/// Process messages from the network
fn run_listen(
obj: &Arc<RwLock<Self>>,
window: &SharedWindow,
ledger_window: &mut Option<&mut LedgerWindow>,
db_ledger: Option<&Arc<RwLock<DbLedger>>>,
requests_receiver: &BlobReceiver,
response_sender: &BlobSender,
) -> Result<()> {
@ -981,7 +935,7 @@ impl ClusterInfo {
}
let mut resps = Vec::new();
for req in reqs {
let mut resp = Self::handle_blob(obj, window, ledger_window, &req.read().unwrap());
let mut resp = Self::handle_blob(obj, db_ledger, &req.read().unwrap());
resps.append(&mut resp);
}
response_sender.send(resps)?;
@ -989,21 +943,17 @@ impl ClusterInfo {
}
pub fn listen(
me: Arc<RwLock<Self>>,
window: SharedWindow,
ledger_path: Option<&str>,
db_ledger: Option<Arc<RwLock<DbLedger>>>,
requests_receiver: BlobReceiver,
response_sender: BlobSender,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap());
Builder::new()
.name("solana-listen".to_string())
.spawn(move || loop {
let e = Self::run_listen(
&me,
&window,
&mut ledger_window.as_mut(),
db_ledger.as_ref(),
&requests_receiver,
&response_sender,
);
@ -1160,12 +1110,12 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
mod tests {
use super::*;
use crate::crds_value::CrdsValueLabel;
use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
use crate::entry::Entry;
use crate::ledger::{get_tmp_ledger_path, LedgerWindow, LedgerWriter};
use crate::logger;
use crate::packet::SharedBlob;
use crate::result::Error;
use crate::window::default_window;
use packet::{Blob, BLOB_HEADER_SIZE};
use solana_sdk::hash::{hash, Hash};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::fs::remove_dir_all;
@ -1263,140 +1213,62 @@ mod tests {
#[test]
fn run_window_request() {
logger::setup();
let window = Arc::new(RwLock::new(default_window()));
let me = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
0,
);
let leader_id = me.id;
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
&me,
leader_id,
0,
);
assert!(rv.is_empty());
let out = SharedBlob::default();
out.write().unwrap().meta.size = 200;
window.write().unwrap()[0].data = Some(out);
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
&me,
leader_id,
0,
);
assert!(!rv.is_empty());
let v = rv[0].clone();
//test we copied the blob
assert_eq!(v.read().unwrap().meta.size, 200);
let len = window.read().unwrap().len() as u64;
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
&me,
leader_id,
len,
);
assert!(rv.is_empty());
fn tmp_ledger(name: &str) -> String {
let path = get_tmp_ledger_path(name);
let mut writer = LedgerWriter::open(&path, true).unwrap();
let zero = Hash::default();
let one = hash(&zero.as_ref());
writer
.write_entries(
&vec![
Entry::new_tick(&zero, 0, 0, &zero),
Entry::new_tick(&one, 1, 0, &one),
]
.to_vec(),
)
.unwrap();
path
}
let ledger_path = tmp_ledger("run_window_request");
let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap();
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut Some(&mut ledger_window),
&me,
leader_id,
1,
);
assert!(!rv.is_empty());
remove_dir_all(ledger_path).unwrap();
}
/// test window requests respond with the right blob, and do not overrun
#[test]
fn run_window_request_with_backoff() {
let window = Arc::new(RwLock::new(default_window()));
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader_id = me.id;
let mock_peer = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
// Simulate handling a repair request from mock_peer
let rv = ClusterInfo::run_window_request(
&mock_peer,
&socketaddr_any!(),
&window,
&mut None,
&me,
leader_id,
0,
);
assert!(rv.is_empty());
let blob = SharedBlob::default();
let blob_size = 200;
blob.write().unwrap().meta.size = blob_size;
window.write().unwrap()[0].data = Some(blob);
let num_requests: u32 = 64;
for i in 0..num_requests {
let shared_blob = ClusterInfo::run_window_request(
&mock_peer,
let ledger_path = get_tmp_ledger_path("run_window_request");
{
let db_ledger = Arc::new(RwLock::new(DbLedger::open(&ledger_path).unwrap()));
let me = NodeInfo::new(
Keypair::new().pubkey(),
socketaddr!("127.0.0.1:1234"),
socketaddr!("127.0.0.1:1235"),
socketaddr!("127.0.0.1:1236"),
socketaddr!("127.0.0.1:1237"),
socketaddr!("127.0.0.1:1238"),
socketaddr!("127.0.0.1:1239"),
0,
);
let leader_id = me.id;
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
&window,
&mut None,
Some(&db_ledger),
&me,
leader_id,
0,
)[0]
.clone();
let blob = shared_blob.read().unwrap();
// Test we copied the blob
assert_eq!(blob.meta.size, blob_size);
);
assert!(rv.is_empty());
let data_size = 1;
let blob = SharedBlob::default();
{
let mut w_blob = blob.write().unwrap();
w_blob.set_size(data_size);
w_blob.set_index(1).expect("set_index()");
w_blob.set_slot(2).expect("set_slot()");
w_blob.meta.size = data_size + BLOB_HEADER_SIZE;
}
let id = if i == 0 || i.is_power_of_two() {
me.id
} else {
mock_peer.id
};
assert_eq!(blob.id().unwrap(), id);
{
let mut w_ledger = db_ledger.write().unwrap();
w_ledger
.write_shared_blobs(2, vec![&blob])
.expect("Expect successful ledger write");
}
let rv = ClusterInfo::run_window_request(
&me,
&socketaddr_any!(),
Some(&db_ledger),
&me,
leader_id,
1,
);
assert!(!rv.is_empty());
let v = rv[0].clone();
assert_eq!(v.read().unwrap().index().unwrap(), 1);
assert_eq!(v.read().unwrap().slot().unwrap(), 2);
assert_eq!(v.read().unwrap().meta.size, BLOB_HEADER_SIZE + data_size);
}
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]

View File

@ -793,6 +793,50 @@ mod tests {
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_data_blobs_slots() {
let num_blobs = 10;
let entries = make_tiny_test_entries(num_blobs);
let shared_blobs = entries.to_blobs();
let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect();
let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect();
let ledger_path = get_tmp_ledger_path("test_insert_data_blobs_slots");
let ledger = DbLedger::open(&ledger_path).unwrap();
// Insert last blob into next slot
let result = ledger
.insert_data_blob(
&DataCf::key(DEFAULT_SLOT_HEIGHT + 1, (num_blobs - 1) as u64),
blobs.last().unwrap(),
)
.unwrap();
assert_eq!(result.len(), 0);
// Insert blobs into first slot, check for consecutive blobs
for i in (0..num_blobs - 1).rev() {
let result = ledger
.insert_data_blob(&DataCf::key(DEFAULT_SLOT_HEIGHT, i as u64), blobs[i])
.unwrap();
let meta = ledger
.meta_cf
.get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))
.unwrap()
.expect("Expected metadata object to exist");
if i != 0 {
assert_eq!(result.len(), 0);
assert!(meta.consumed == 0 && meta.received == num_blobs as u64);
} else {
assert_eq!(result, entries);
assert!(meta.consumed == num_blobs as u64 && meta.received == num_blobs as u64);
}
}
// Destroying database without closing it first is undefined behavior
drop(ledger);
DbLedger::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_iteration_order() {
let slot = 0;

View File

@ -216,6 +216,9 @@ impl Fullnode {
sigverify_disabled: bool,
rpc_port: Option<u16>,
) -> Self {
// Create the RocksDb ledger
let db_ledger = Self::make_db_ledger(ledger_path);
let mut rpc_addr = node.info.rpc;
let mut rpc_pubsub_addr = node.info.rpc_pubsub;
// Use custom RPC port, if provided (`Some(port)`)
@ -244,8 +247,7 @@ impl Fullnode {
let gossip_service = GossipService::new(
&cluster_info,
shared_window.clone(),
Some(ledger_path),
Some(db_ledger.clone()),
node.sockets.gossip,
exit.clone(),
);
@ -266,9 +268,6 @@ impl Fullnode {
cluster_info.write().unwrap().set_leader(scheduled_leader);
// Create the RocksDb ledger
let db_ledger = Self::make_db_ledger(ledger_path);
let node_role = if scheduled_leader != keypair.pubkey() {
// Start in validator mode.
let sockets = Sockets {

View File

@ -1,9 +1,9 @@
//! The `gossip_service` module implements the network control plane.
use crate::cluster_info::ClusterInfo;
use crate::db_ledger::DbLedger;
use crate::service::Service;
use crate::streamer;
use crate::window::SharedWindow;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
@ -18,8 +18,7 @@ pub struct GossipService {
impl GossipService {
pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
ledger_path: Option<&str>,
db_ledger: Option<Arc<RwLock<DbLedger>>>,
gossip_socket: UdpSocket,
exit: Arc<AtomicBool>,
) -> Self {
@ -36,8 +35,7 @@ impl GossipService {
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
let t_listen = ClusterInfo::listen(
cluster_info.clone(),
window,
ledger_path,
db_ledger,
request_receiver,
response_sender.clone(),
exit.clone(),
@ -79,8 +77,7 @@ mod tests {
let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new(tn.info.clone());
let c = Arc::new(RwLock::new(cluster_info));
let w = Arc::new(RwLock::new(vec![]));
let d = GossipService::new(&c, w, None, tn.sockets.gossip, exit.clone());
let d = GossipService::new(&c, None, tn.sockets.gossip, exit.clone());
d.close().expect("thread join");
}
}

View File

@ -363,7 +363,9 @@ impl Blob {
}
pub fn size(&self) -> Result<usize> {
let size = self.data_size()? as usize;
if self.meta.size == size {
if size <= BLOB_HEADER_SIZE {
Err(Error::BlobError(BlobError::BadState))
} else if self.meta.size == size {
Ok(size - BLOB_HEADER_SIZE)
} else {
Err(Error::BlobError(BlobError::BadState))

View File

@ -99,7 +99,6 @@ impl Replicator {
const REPLICATOR_WINDOW_SIZE: usize = 32 * 1024;
let window = window::new_window(REPLICATOR_WINDOW_SIZE);
let shared_window = Arc::new(RwLock::new(window));
info!("Replicator: id: {}", keypair.pubkey());
info!("Creating cluster info....");
@ -127,8 +126,7 @@ impl Replicator {
let gossip_service = GossipService::new(
&cluster_info,
shared_window.clone(),
ledger_path,
Some(db_ledger.clone()),
node.sockets.gossip,
exit.clone(),
);

View File

@ -349,14 +349,8 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
let (node, gossip_socket) = ClusterInfo::spy_node();
let my_addr = gossip_socket.local_addr().unwrap();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(node)));
let window = Arc::new(RwLock::new(vec![]));
let gossip_service = GossipService::new(
&cluster_info.clone(),
window,
None,
gossip_socket,
exit.clone(),
);
let gossip_service =
GossipService::new(&cluster_info.clone(), None, gossip_socket, exit.clone());
let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp);
cluster_info

View File

@ -211,7 +211,7 @@ pub mod tests {
exit: Arc<AtomicBool>,
) -> (GossipService, SharedWindow) {
let window = Arc::new(RwLock::new(window::default_window()));
let gossip_service = GossipService::new(&cluster_info, window.clone(), None, gossip, exit);
let gossip_service = GossipService::new(&cluster_info, None, gossip, exit);
(gossip_service, window)
}

View File

@ -21,8 +21,7 @@ fn test_node(exit: Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService,
let mut tn = Node::new_localhost_with_pubkey(keypair.pubkey());
let cluster_info = ClusterInfo::new_with_keypair(tn.info.clone(), Arc::new(keypair));
let c = Arc::new(RwLock::new(cluster_info));
let w = Arc::new(RwLock::new(vec![]));
let d = GossipService::new(&c.clone(), w, None, tn.sockets.gossip, exit);
let d = GossipService::new(&c.clone(), None, tn.sockets.gossip, exit);
let _ = c.read().unwrap().my_data();
(c, d, tn.sockets.tvu.pop().unwrap())
}

View File

@ -49,10 +49,8 @@ fn make_spy_node(leader: &NodeInfo) -> (GossipService, Arc<RwLock<ClusterInfo>>,
spy_cluster_info.insert_info(leader.clone());
spy_cluster_info.set_leader(leader.id);
let spy_cluster_info_ref = Arc::new(RwLock::new(spy_cluster_info));
let spy_window = Arc::new(RwLock::new(default_window()));
let gossip_service = GossipService::new(
&spy_cluster_info_ref,
spy_window,
None,
spy.sockets.gossip,
exit.clone(),
@ -73,10 +71,8 @@ fn make_listening_node(
new_node_cluster_info.insert_info(leader.clone());
new_node_cluster_info.set_leader(leader.id);
let new_node_cluster_info_ref = Arc::new(RwLock::new(new_node_cluster_info));
let new_node_window = Arc::new(RwLock::new(default_window()));
let gossip_service = GossipService::new(
&new_node_cluster_info_ref,
new_node_window,
None,
new_node
.sockets