This commit is contained in:
Rob Walker 2018-06-27 12:35:58 -07:00
parent d95e8030fc
commit 2f42658cd4
10 changed files with 66 additions and 53 deletions

View File

@ -87,11 +87,6 @@ pub struct Bank {
/// The number of transactions the bank has processed without error since the
/// start of the ledger.
transaction_count: AtomicUsize,
/// The number of Entries the bank has processed without error since start
/// of the ledger, i.e. poor-man's network synchronization
/// TODO: upgrade to U64 when stable?
entry_count: AtomicUsize,
}
impl Bank {
@ -105,7 +100,6 @@ impl Bank {
time_sources: RwLock::new(HashSet::new()),
last_time: RwLock::new(Utc.timestamp(0, 0)),
transaction_count: AtomicUsize::new(0),
entry_count: AtomicUsize::new(0),
};
bank.apply_payment(deposit, &mut bank.balances.write().unwrap());
bank
@ -302,12 +296,13 @@ impl Bank {
}
/// Process an ordered list of entries.
pub fn process_entries<I>(&self, entries: I) -> Result<usize>
pub fn process_entries<I>(&self, entries: I) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
{
let mut entry_count = 0;
for entry in entries {
self.entry_count.fetch_add(1, Ordering::Relaxed);
entry_count += 1;
if !entry.transactions.is_empty() {
for result in self.process_transactions(entry.transactions) {
@ -321,7 +316,7 @@ impl Bank {
self.register_entry_id(&entry.id);
}
}
Ok(self.entry_count())
Ok(entry_count)
}
/// Process a Witness Signature. Any payment plans waiting on this signature
@ -435,9 +430,6 @@ impl Bank {
pub fn transaction_count(&self) -> usize {
self.transaction_count.load(Ordering::Relaxed)
}
pub fn entry_count(&self) -> usize {
self.entry_count.load(Ordering::Relaxed)
}
}
#[cfg(test)]

View File

@ -101,9 +101,11 @@ fn main() {
bank.register_entry_id(&entry0.id);
bank.register_entry_id(&entry1.id);
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
eprintln!("processing entries...");
let num_entries = bank.process_entries(entries).expect("process_entries");
eprintln!("processed {} entries...", num_entries);
let entry_height = bank.process_entries(entries).expect("process_entries");
eprintln!("processed {} entries...", entry_height);
eprintln!("creating networking stack...");
@ -135,6 +137,7 @@ fn main() {
let newtwork_entry_point = ReplicatedData::new_entry_point(testnet_addr);
let s = Server::new_validator(
bank,
entry_height,
repl_data.clone(),
UdpSocket::bind(repl_data.requests_addr).unwrap(),
UdpSocket::bind("0.0.0.0:0").unwrap(),
@ -160,6 +163,7 @@ fn main() {
let server = Server::new_leader(
bank,
entry_height,
//Some(Duration::from_millis(1000)),
None,
repl_data.clone(),

View File

@ -727,7 +727,10 @@ impl Crdt {
}
} else {
assert!(window.read().unwrap()[pos].is_none());
info!("failed RequestWindowIndex {} {}", ix, from.repair_addr);
info!(
"failed RequestWindowIndex {} {} {}",
ix, pos, from.repair_addr
);
}
None

View File

@ -248,6 +248,7 @@ mod tests {
let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,

View File

@ -46,6 +46,7 @@ impl Server {
/// ```
pub fn new_leader<W: Write + Send + 'static>(
bank: Bank,
entry_height: u64,
tick_duration: Option<Duration>,
me: ReplicatedData,
requests_socket: UdpSocket,
@ -89,9 +90,9 @@ impl Server {
exit.clone(),
crdt,
window,
entry_height,
blob_recycler.clone(),
tpu.blob_receiver,
bank.entry_count(),
);
thread_hdls.extend(vec![t_broadcast]);
@ -129,6 +130,7 @@ impl Server {
/// ```
pub fn new_validator(
bank: Bank,
entry_height: u64,
me: ReplicatedData,
requests_socket: UdpSocket,
respond_socket: UdpSocket,
@ -160,6 +162,7 @@ impl Server {
let tvu = Tvu::new(
bank.clone(),
entry_height,
crdt.clone(),
window.clone(),
replicate_socket,
@ -188,6 +191,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false));
let v = Server::new_validator(
bank,
0,
tn.data.clone(),
tn.sockets.requests,
tn.sockets.respond,

View File

@ -16,7 +16,7 @@ use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
pub const WINDOW_SIZE: usize = 2 * 1024;
pub const WINDOW_SIZE: u64 = 2 * 1024;
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>;
@ -148,8 +148,8 @@ pub fn blob_receiver(
fn find_next_missing(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
) -> Result<Vec<(SocketAddr, Vec<u8>)>> {
if *received <= *consumed {
return Err(Error::GenericError);
@ -157,7 +157,7 @@ fn find_next_missing(
let window = locked_window.read().unwrap();
let reqs: Vec<_> = (*consumed..*received)
.filter_map(|pix| {
let i = pix % WINDOW_SIZE;
let i = (pix % WINDOW_SIZE) as usize;
if let &None = &window[i] {
let val = crdt.read().unwrap().window_index_request(pix as u64);
if let Ok((to, req)) = val {
@ -174,18 +174,18 @@ fn repair_window(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
_recycler: &BlobRecycler,
last: &mut usize,
last: &mut u64,
times: &mut usize,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
) -> Result<()> {
#[cfg(feature = "erasure")]
{
if erasure::recover(
_recycler,
&mut locked_window.write().unwrap(),
*consumed,
*received,
*consumed as usize,
*received as usize,
).is_err()
{
trace!("erasure::recover failed");
@ -217,8 +217,8 @@ fn recv_window(
locked_window: &Window,
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
consumed: &mut usize,
received: &mut usize,
consumed: &mut u64,
received: &mut u64,
r: &BlobReceiver,
s: &BlobSender,
retransmit: &BlobSender,
@ -273,7 +273,7 @@ fn recv_window(
while let Some(b) = dq.pop_front() {
let (pix, meta_size) = {
let p = b.write().expect("'b' write lock in fn recv_window");
(p.get_index()? as usize, p.meta.size)
(p.get_index()?, p.meta.size)
};
if pix > *received {
*received = pix;
@ -287,7 +287,7 @@ fn recv_window(
);
continue;
}
let w = pix % WINDOW_SIZE;
let w = (pix % WINDOW_SIZE) as usize;
//TODO, after the block are authenticated
//if we get different blocks at the same index
//that is a network failure/attack
@ -304,7 +304,7 @@ fn recv_window(
}
}
loop {
let k = *consumed % WINDOW_SIZE;
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("k: {} consumed: {}", k, *consumed);
if window[k].is_none() {
break;
@ -330,19 +330,21 @@ fn recv_window(
} else {
#[cfg(feature = "erasure")]
{
let block_start = *consumed - (*consumed % erasure::NUM_CODED);
let coding_end = block_start + erasure::NUM_CODED;
let block_start = *consumed - (*consumed % erasure::NUM_CODED as u64);
let coding_end = block_start + erasure::NUM_CODED as u64;
// We've received all this block's data blobs, go and null out the window now
for j in block_start..*consumed {
if let Some(b) = mem::replace(&mut window[j % WINDOW_SIZE], None) {
if let Some(b) =
mem::replace(&mut window[(j % WINDOW_SIZE) as usize], None)
{
recycler.recycle(b);
}
}
for j in *consumed..coding_end {
window[j % WINDOW_SIZE] = None;
window[(j % WINDOW_SIZE) as usize] = None;
}
*consumed += erasure::MAX_MISSING;
*consumed += erasure::MAX_MISSING as u64;
debug!(
"skipping processing coding blob k: {} consumed: {}",
k, *consumed
@ -361,7 +363,7 @@ fn recv_window(
Ok(())
}
fn print_window(locked_window: &Window, consumed: usize) {
fn print_window(locked_window: &Window, consumed: u64) {
{
let buf: Vec<_> = locked_window
.read()
@ -369,7 +371,7 @@ fn print_window(locked_window: &Window, consumed: usize) {
.iter()
.enumerate()
.map(|(i, v)| {
if i == (consumed % WINDOW_SIZE) {
if i == (consumed % WINDOW_SIZE) as usize {
"_"
} else if v.is_none() {
"0"
@ -391,25 +393,25 @@ fn print_window(locked_window: &Window, consumed: usize) {
}
pub fn default_window() -> Window {
Arc::new(RwLock::new(vec![None; WINDOW_SIZE]))
Arc::new(RwLock::new(vec![None; WINDOW_SIZE as usize]))
}
pub fn window(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
s: BlobSender,
retransmit: BlobSender,
entry_count: usize,
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut consumed = entry_count;
let mut received = entry_count;
let mut last = entry_count;
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
loop {
if exit.load(Ordering::Relaxed) {
@ -459,9 +461,9 @@ fn broadcast(
// We could receive more blobs than window slots so
// break them up into window-sized chunks to process
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE).map(|x| x.to_vec());
let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec());
print_window(window, *receive_index as usize);
print_window(window, *receive_index);
for mut blobs in blobs_chunked {
// Insert the coding blobs into the blob stream
@ -479,7 +481,7 @@ fn broadcast(
assert!(blobs.len() <= win.len());
for b in &blobs {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
let pos = (ix % WINDOW_SIZE) as usize;
if let Some(x) = mem::replace(&mut win[pos], None) {
trace!(
"popped {} at {}",
@ -492,7 +494,7 @@ fn broadcast(
}
while let Some(b) = blobs.pop() {
let ix = b.read().unwrap().get_index().expect("blob index");
let pos = (ix as usize) % WINDOW_SIZE;
let pos = (ix % WINDOW_SIZE) as usize;
trace!("caching {} at {}", ix, pos);
assert!(win[pos].is_none());
win[pos] = Some(b);
@ -531,15 +533,15 @@ pub fn broadcaster(
exit: Arc<AtomicBool>,
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
recycler: BlobRecycler,
r: BlobReceiver,
entry_count: usize,
) -> JoinHandle<()> {
Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let mut transmit_index = entry_count as u64;
let mut receive_index = entry_count as u64;
let mut transmit_index = entry_height;
let mut receive_index = entry_height;
loop {
if exit.load(Ordering::Relaxed) {
break;
@ -825,11 +827,11 @@ mod test {
exit.clone(),
subs,
win,
0,
resp_recycler.clone(),
r_reader,
s_window,
s_retransmit,
0,
);
let (s_responder, r_responder) = channel();
let t_responder = responder(

View File

@ -205,6 +205,7 @@ mod tests {
let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,
@ -249,6 +250,7 @@ mod tests {
let server = Server::new_leader(
bank,
0,
Some(Duration::from_millis(30)),
leader.data.clone(),
leader.sockets.requests,

View File

@ -55,6 +55,7 @@ impl Tvu {
/// on the bank state.
/// # Arguments
/// * `bank` - The bank state.
/// * `entry_height` - Initial ledger height, passed to replicate stage
/// * `crdt` - The crdt state.
/// * `window` - The window state.
/// * `replicate_socket` - my replicate socket
@ -63,6 +64,7 @@ impl Tvu {
/// * `exit` - The exit signal.
pub fn new(
bank: Arc<Bank>,
entry_height: u64,
crdt: Arc<RwLock<Crdt>>,
window: Window,
replicate_socket: UdpSocket,
@ -82,11 +84,11 @@ impl Tvu {
let window_stage = WindowStage::new(
crdt,
window,
entry_height,
retransmit_socket,
exit.clone(),
blob_recycler.clone(),
fetch_stage.blob_receiver,
bank.entry_count(),
);
let replicate_stage =
@ -194,6 +196,7 @@ pub mod tests {
let tvu = Tvu::new(
bank.clone(),
0,
cref1,
dr_1.1,
target1.sockets.replicate,

View File

@ -18,11 +18,11 @@ impl WindowStage {
pub fn new(
crdt: Arc<RwLock<Crdt>>,
window: Window,
entry_height: u64,
retransmit_socket: UdpSocket,
exit: Arc<AtomicBool>,
blob_recycler: BlobRecycler,
fetch_stage_receiver: BlobReceiver,
entry_count: usize,
) -> Self {
let (retransmit_sender, retransmit_receiver) = channel();
@ -38,11 +38,11 @@ impl WindowStage {
exit.clone(),
crdt.clone(),
window,
entry_height,
blob_recycler.clone(),
fetch_stage_receiver,
blob_sender,
retransmit_sender,
entry_count,
);
let thread_hdls = vec![t_retransmit, t_window];

View File

@ -32,6 +32,7 @@ fn validator(
let replicant_bank = Bank::new(&alice);
let mut ts = Server::new_validator(
replicant_bank,
0,
validator.data.clone(),
validator.sockets.requests,
validator.sockets.respond,
@ -105,6 +106,7 @@ fn test_multi_node() {
let leader_bank = Bank::new(&alice);
let server = Server::new_leader(
leader_bank,
0,
None,
leader.data.clone(),
leader.sockets.requests,