rework read_ledger, LedgerWriter, and LedgerWindow for recover()

fixes #910
This commit is contained in:
Rob Walker 2018-08-10 17:56:08 -07:00
parent 58f220a3b7
commit 96d6985895
9 changed files with 70 additions and 47 deletions

View File

@ -55,7 +55,7 @@ fn main() -> Result<(), Box<error::Error>> {
let pkcs8: Vec<u8> = serde_json::from_str(&buffer)?;
let mint = Mint::new_with_pkcs8(tokens, pkcs8);
let mut ledger_writer = LedgerWriter::new(&ledger_path, true)?;
let mut ledger_writer = LedgerWriter::open(&ledger_path, true)?;
ledger_writer.write_entries(mint.create_entries())?;
Ok(())

View File

@ -28,7 +28,7 @@ fn main() {
.get_matches();
let ledger_path = matches.value_of("ledger").unwrap();
let entries = match read_ledger(ledger_path) {
let entries = match read_ledger(ledger_path, true) {
Ok(entries) => entries,
Err(err) => {
println!("Failed to open ledger at {}: {}", ledger_path, err);

View File

@ -1211,7 +1211,7 @@ impl Crdt {
) -> JoinHandle<()> {
let debug_id = obj.read().unwrap().debug_id();
let mut ledger_window = ledger_path.map(|p| LedgerWindow::new(p).unwrap());
let mut ledger_window = ledger_path.map(|p| LedgerWindow::open(p).unwrap());
Builder::new()
.name("solana-listen".to_string())
@ -1914,7 +1914,7 @@ mod tests {
let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey());
let mut writer = LedgerWriter::new(&path, true).unwrap();
let mut writer = LedgerWriter::open(&path, true).unwrap();
let zero = Hash::default();
let one = hash(&zero.as_ref());
writer
@ -1924,7 +1924,7 @@ mod tests {
}
let ledger_path = tmp_ledger("run_window_request");
let mut ledger_window = LedgerWindow::new(&ledger_path).unwrap();
let mut ledger_window = LedgerWindow::open(&ledger_path).unwrap();
let rv = Crdt::run_window_request(
&window,

View File

@ -60,7 +60,7 @@ impl FullNode {
info!("creating bank...");
let bank = Bank::new_default(leader);
let entries = read_ledger(ledger_path).expect("opening ledger");
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries.map(|e| e.expect("failed to parse entry"));

View File

@ -93,11 +93,10 @@ fn u64_at<A: Read + Seek>(file: &mut A, at: u64) -> io::Result<u64> {
impl LedgerWindow {
// opens a Ledger in directory, provides "infinite" window
pub fn new(ledger_path: &str) -> io::Result<Self> {
//
pub fn open(ledger_path: &str) -> io::Result<Self> {
let ledger_path = Path::new(&ledger_path);
recover_ledger(ledger_path)?;
let index = File::open(ledger_path.join("index"))?;
let index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index);
let data = File::open(ledger_path.join("data"))?;
@ -112,13 +111,9 @@ impl LedgerWindow {
}
}
pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
pub fn verify_ledger(ledger_path: &str) -> io::Result<()> {
let ledger_path = Path::new(&ledger_path);
if recover {
recover_ledger(ledger_path)?;
}
let index = File::open(ledger_path.join("index"))?;
let index_len = index.metadata()?.len();
@ -150,10 +145,22 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
))?;
}
let entry = entry_at(&mut data, data_offset)?;
last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64;
last_data_offset = data_offset;
match entry_at(&mut data, data_offset) {
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!(
"entry[{}] deserialize() failed at offset {}, err: {}",
index_offset / SIZEOF_U64,
data_offset,
e.to_string(),
),
))?,
Ok(entry) => {
last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64
}
}
last_data_offset = data_offset;
data_read += last_len;
index_offset += SIZEOF_U64;
}
@ -167,7 +174,8 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
Ok(())
}
fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
fn recover_ledger(ledger_path: &str) -> io::Result<()> {
let ledger_path = Path::new(ledger_path);
let mut index = OpenOptions::new()
.write(true)
.read(true)
@ -261,16 +269,21 @@ pub struct LedgerWriter {
}
impl LedgerWriter {
// recover and open the ledger for writing
pub fn recover(ledger_path: &str) -> io::Result<Self> {
recover_ledger(ledger_path)?;
LedgerWriter::open(ledger_path, false)
}
// opens or creates a LedgerWriter in ledger_path directory
pub fn new(ledger_path: &str, create: bool) -> io::Result<Self> {
pub fn open(ledger_path: &str, create: bool) -> io::Result<Self> {
let ledger_path = Path::new(&ledger_path);
if create {
let _ignored = remove_dir_all(ledger_path);
create_dir_all(ledger_path)?;
} else {
recover_ledger(ledger_path)?;
}
let index = OpenOptions::new()
.create(create)
.append(true)
@ -360,11 +373,15 @@ impl Iterator for LedgerReader {
}
/// Return an iterator for all the entries in the given file.
pub fn read_ledger(ledger_path: &str) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
pub fn read_ledger(
ledger_path: &str,
recover: bool,
) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
if recover {
recover_ledger(ledger_path)?;
}
let ledger_path = Path::new(&ledger_path);
recover_ledger(ledger_path)?;
let data = File::open(ledger_path.join("data"))?;
let data = BufReader::new(data);
@ -682,21 +699,21 @@ mod tests {
let entries = make_tiny_test_entries(10);
{
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap();
// drops writer, flushes buffers
}
verify_ledger(&ledger_path, false).unwrap();
verify_ledger(&ledger_path).unwrap();
let mut read_entries = vec![];
for x in read_ledger(&ledger_path).unwrap() {
for x in read_ledger(&ledger_path, true).unwrap() {
let entry = x.unwrap();
trace!("entry... {:?}", entry);
read_entries.push(entry);
}
assert_eq!(read_entries, entries);
let mut window = LedgerWindow::new(&ledger_path).unwrap();
let mut window = LedgerWindow::open(&ledger_path).unwrap();
for (i, entry) in entries.iter().enumerate() {
let read_entry = window.get_entry(i as u64).unwrap();
@ -706,19 +723,19 @@ mod tests {
std::fs::remove_file(Path::new(&ledger_path).join("data")).unwrap();
// empty data file should fall over
assert!(LedgerWindow::new(&ledger_path).is_err());
assert!(read_ledger(&ledger_path).is_err());
assert!(LedgerWindow::open(&ledger_path).is_err());
assert!(read_ledger(&ledger_path, false).is_err());
std::fs::remove_dir_all(ledger_path).unwrap();
}
fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) {
let len = {
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries).unwrap();
writer.data.seek(SeekFrom::Current(0)).unwrap()
};
verify_ledger(&ledger_path, false).unwrap();
verify_ledger(&ledger_path).unwrap();
let data = OpenOptions::new()
.write(true)
@ -728,13 +745,13 @@ mod tests {
}
fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) {
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries).unwrap();
writer.data.write_all(b"hi there!").unwrap();
}
fn read_ledger_check(ledger_path: &str, entries: Vec<Entry>, len: usize) {
let read_entries = read_ledger(&ledger_path).unwrap();
let read_entries = read_ledger(&ledger_path, true).unwrap();
let mut i = 0;
for entry in read_entries {
@ -745,7 +762,7 @@ mod tests {
}
fn ledger_window_check(ledger_path: &str, entries: Vec<Entry>, len: usize) {
let mut window = LedgerWindow::new(&ledger_path).unwrap();
let mut window = LedgerWindow::open(&ledger_path).unwrap();
for i in 0..len {
let entry = window.get_entry(i as u64);
assert_eq!(entry.unwrap(), entries[i]);
@ -770,11 +787,14 @@ mod tests {
// restore last entry, tests recover_ledger() inside LedgerWriter::new()
truncated_last_entry(&ledger_path, entries.clone());
// verify should fail at first
assert!(verify_ledger(&ledger_path).is_err());
{
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
let mut writer = LedgerWriter::recover(&ledger_path).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
verify_ledger(&ledger_path, false).unwrap();
// and be fine after recover()
verify_ledger(&ledger_path).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
@ -789,11 +809,12 @@ mod tests {
// make it look like data is newer in time, check writer...
garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec());
assert!(verify_ledger(&ledger_path).is_err());
{
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
let mut writer = LedgerWriter::recover(&ledger_path).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
verify_ledger(&ledger_path, false).unwrap();
verify_ledger(&ledger_path).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
let _ignored = remove_dir_all(&ledger_path);
@ -807,11 +828,13 @@ mod tests {
let entries = make_tiny_test_entries(10);
let ledger_path = tmp_ledger_path("test_verify_ledger");
{
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap();
}
// TODO more cases that make ledger_verify() fail
// assert!(verify_ledger(&ledger_path).is_err());
assert!(verify_ledger(&ledger_path, false).is_ok());
assert!(verify_ledger(&ledger_path).is_ok());
let _ignored = remove_dir_all(&ledger_path);
}

View File

@ -98,7 +98,7 @@ impl ReplicateStage {
exit,
);
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::new(p, false).unwrap());
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::open(p, false).unwrap());
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())

View File

@ -302,7 +302,7 @@ mod tests {
let path = format!("/tmp/tmp-ledger-{}-{}", name, keypair.pubkey());
let mut writer = LedgerWriter::new(&path, true).unwrap();
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
path

View File

@ -86,7 +86,7 @@ impl WriteStage {
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let mut ledger_writer = LedgerWriter::new(ledger_path, false).unwrap();
let mut ledger_writer = LedgerWriter::recover(ledger_path).unwrap();
let thread_hdl = Builder::new()
.name("solana-writer".to_string())

View File

@ -88,7 +88,7 @@ fn genesis(name: &str, num: i64) -> (Mint, String) {
let mint = Mint::new(num);
let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::new(&path, true).unwrap();
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
@ -141,7 +141,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
// and force him to respond to repair from the ledger window
{
let entries = make_tiny_test_entries(alice.last_id(), WINDOW_SIZE as usize * 2);
let mut writer = LedgerWriter::new(&leader_ledger_path, false).unwrap();
let mut writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
writer.write_entries(entries).unwrap();
}