Problem: potential loss of database updates

chance of loss of database updates that would cause it to redo
transactions it already did.

Let's say we've got some database updates through deposit relaying:
https://github.com/poanetwork/poa-bridge/blob/master/bridge/src/bridge/mod.rs#L164-L165

Then, during relaying withdrawals, an error happened:
https://github.com/poanetwork/poa-bridge/blob/master/bridge/src/bridge/mod.rs#L171-L172

This means that we won't reach
https://github.com/poanetwork/poa-bridge/blob/master/bridge/src/bridge/mod.rs#L185-L193
to save the result.

Also, in a similar vein, if one of these streams was to end
(`Ready(None)`) we'd experience a similar loss of updates:
https://github.com/poanetwork/poa-bridge/blob/master/bridge/src/macros.rs#L5

Solution: refactor bridge into two streams, splitting responsibilities

One stream (`BridgeEventStream`) returns `BridgeChecked` and the other
(`Bridge`) writes those checks down to the database.

This way we're not accumulating chcecks before we serialize them,
risking not serializing them at all in the event of an unrelated error.

Fixes #84
This commit is contained in:
Yurii Rashkovskii 2018-05-25 00:24:16 -07:00
parent 277ac03e00
commit af81eb0d57
No known key found for this signature in database
GPG Key ID: 1D60D7CFD80845FF
4 changed files with 101 additions and 114 deletions

View File

@ -11,6 +11,7 @@ use util::web3_filter;
use app::App;
use ethcore_transaction::{Transaction, Action};
use super::nonce::{NonceCheck, SendRawTransaction};
use super::BridgeChecked;
use itertools::Itertools;
fn deposits_filter(home: &home::HomeBridge, address: Address) -> FilterBuilder {
@ -72,7 +73,7 @@ pub struct DepositRelay<T: Transport> {
}
impl<T: Transport> Stream for DepositRelay<T> {
type Item = u64;
type Item = BridgeChecked;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -126,7 +127,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
},
DepositRelayState::Yield(ref mut block) => match block.take() {
None => DepositRelayState::Wait,
some => return Ok(some.into()),
Some(v) => return Ok(Some(BridgeChecked::DepositRelay(v)).into()),
}
};
self.state = next_state;

View File

@ -15,7 +15,7 @@ use web3::Transport;
use web3::types::U256;
use app::App;
use database::Database;
use error::{Error, ErrorKind, Result};
use error::{Error, ErrorKind};
use tokio_core::reactor::Handle;
pub use self::deploy::{Deploy, Deployed, create_deploy};
@ -34,57 +34,51 @@ pub enum BridgeChecked {
WithdrawConfirm(u64),
}
pub trait BridgeBackend {
fn save(&mut self, checks: Vec<BridgeChecked>) -> Result<()>;
}
pub struct FileBackend {
pub struct Bridge<ES: Stream<Item = BridgeChecked>> {
path: PathBuf,
database: Database,
event_stream: ES,
}
impl BridgeBackend for FileBackend {
fn save(&mut self, checks: Vec<BridgeChecked>) -> Result<()> {
for check in checks {
match check {
BridgeChecked::DepositRelay(n) => {
self.database.checked_deposit_relay = n;
},
BridgeChecked::WithdrawRelay(n) => {
self.database.checked_withdraw_relay = n;
},
BridgeChecked::WithdrawConfirm(n) => {
self.database.checked_withdraw_confirm = n;
},
}
}
impl<ES: Stream<Item = BridgeChecked, Error = Error>> Stream for Bridge<ES> {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let check = try_stream!(self.event_stream.poll());
match check {
BridgeChecked::DepositRelay(n) => {
self.database.checked_deposit_relay = n;
},
BridgeChecked::WithdrawRelay(n) => {
self.database.checked_withdraw_relay = n;
},
BridgeChecked::WithdrawConfirm(n) => {
self.database.checked_withdraw_confirm = n;
},
}
let file = fs::OpenOptions::new()
.write(true)
.create(true)
.open(&self.path)?;
self.database.save(file)
self.database.save(file)?;
Ok(Async::Ready(Some(())))
}
}
enum BridgeStatus {
Wait,
NextItem(Option<()>),
}
/// Creates new bridge.
pub fn create_bridge<T: Transport + Clone>(app: Arc<App<T>>, init: &Database, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<T, FileBackend> {
let backend = FileBackend {
pub fn create_bridge<'a, T: Transport + 'a + Clone>(app: Arc<App<T>>, init: &Database, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<BridgeEventStream<'a, T>> {
Bridge {
path: app.database_path.clone(),
database: init.clone(),
};
create_bridge_backed_by(app, init, backend, handle, home_chain_id, foreign_chain_id)
event_stream: create_bridge_event_stream(app, init, handle, home_chain_id, foreign_chain_id),
}
}
/// Creates new bridge writing to custom backend.
pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<App<T>>, init: &Database, backend: F, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<T, F> {
pub fn create_bridge_event_stream<'a, T: Transport + 'a + Clone>(app: Arc<App<T>>, init: &Database, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> BridgeEventStream<'a, T> {
let home_balance = Arc::new(RwLock::new(None));
let foreign_balance = Arc::new(RwLock::new(None));
@ -105,16 +99,22 @@ pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<
let home_gas_price = Arc::new(RwLock::new(app.config.home.default_gas_price));
let foreign_gas_price = Arc::new(RwLock::new(app.config.foreign.default_gas_price));
Bridge {
let deposit_relay = create_deposit_relay(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone())
.map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "deposit_relay").into());
let withdraw_relay = create_withdraw_relay(app.clone(), init, home_balance.clone(), home_chain_id, home_gas_price.clone())
.map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_relay").into());
let withdraw_confirm = create_withdraw_confirm(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone())
.map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_confirm").into());
let bridge = Box::new(deposit_relay.select(withdraw_relay).select(withdraw_confirm));
BridgeEventStream {
foreign_balance_check: create_balance_check(app.clone(), app.connections.foreign.clone(), app.config.foreign.clone()),
home_balance_check: create_balance_check(app.clone(), app.connections.home.clone(), app.config.home.clone()),
foreign_balance: foreign_balance.clone(),
home_balance: home_balance.clone(),
deposit_relay: create_deposit_relay(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone()),
withdraw_relay: create_withdraw_relay(app.clone(), init, home_balance.clone(), home_chain_id, home_gas_price.clone()),
withdraw_confirm: create_withdraw_confirm(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone()),
state: BridgeStatus::Wait,
backend,
bridge,
state: BridgeStatus::Init,
running: app.running.clone(),
home_gas_stream,
foreign_gas_stream,
@ -123,16 +123,19 @@ pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<
}
}
pub struct Bridge<T: Transport, F> {
enum BridgeStatus {
Init,
Wait,
NextItem(Option<BridgeChecked>),
}
pub struct BridgeEventStream<'a, T: Transport + 'a> {
home_balance_check: BalanceCheck<T>,
foreign_balance_check: BalanceCheck<T>,
home_balance: Arc<RwLock<Option<U256>>>,
foreign_balance: Arc<RwLock<Option<U256>>>,
deposit_relay: DepositRelay<T>,
withdraw_relay: WithdrawRelay<T>,
withdraw_confirm: WithdrawConfirm<T>,
bridge: Box<Stream<Item = BridgeChecked, Error = Error> + 'a>,
state: BridgeStatus,
backend: F,
running: Arc<AtomicBool>,
home_gas_stream: Option<GasPriceStream>,
foreign_gas_stream: Option<GasPriceStream>,
@ -142,7 +145,7 @@ pub struct Bridge<T: Transport, F> {
use std::sync::atomic::{AtomicBool, Ordering};
impl<T: Transport, F: BridgeBackend> Bridge<T, F> {
impl<'a, T: Transport + 'a> BridgeEventStream<'a, T> {
fn check_balances(&mut self) -> Poll<Option<()>, Error> {
let mut home_balance = self.home_balance.write().unwrap();
let mut foreign_balance = self.foreign_balance.write().unwrap();
@ -178,70 +181,36 @@ impl<T: Transport, F: BridgeBackend> Bridge<T, F> {
}
}
impl<T: Transport, F: BridgeBackend> Stream for Bridge<T, F> {
type Item = ();
impl<'a, T: Transport + 'a> Stream for BridgeEventStream<'a, T> {
type Item = BridgeChecked;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let next_state = match self.state {
BridgeStatus::Init => {
match self.check_balances()? {
Async::NotReady => return Ok(Async::NotReady),
_ => (),
}
BridgeStatus::Wait
},
BridgeStatus::Wait => {
if !self.running.load(Ordering::SeqCst) {
return Err(ErrorKind::ShutdownRequested.into())
}
// Intended to be used upon startup
let balance_is_absent = {
let mut home_balance = self.home_balance.read().unwrap();
let mut foreign_balance = self.foreign_balance.read().unwrap();
home_balance.is_none() || foreign_balance.is_none()
};
if balance_is_absent {
match self.check_balances()? {
Async::NotReady => return Ok(Async::NotReady),
_ => (),
}
}
let _ = self.get_gas_prices();
let d_relay = try_bridge!(self.deposit_relay.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "deposit_relay")))
.map(BridgeChecked::DepositRelay);
if d_relay.is_some() {
self.check_balances()?;
}
let w_relay = try_bridge!(self.withdraw_relay.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_relay"))).
map(BridgeChecked::WithdrawRelay);
if w_relay.is_some() {
self.check_balances()?;
}
let w_confirm = try_bridge!(self.withdraw_confirm.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_confirm"))).
map(BridgeChecked::WithdrawConfirm);
if w_confirm.is_some() {
self.check_balances()?;
}
let result: Vec<_> = [d_relay, w_relay, w_confirm]
.into_iter()
.filter_map(|c| *c)
.collect();
if result.is_empty() {
return Ok(Async::NotReady);
} else {
self.backend.save(result)?;
BridgeStatus::NextItem(Some(()))
}
let item = try_stream!(self.bridge.poll());
BridgeStatus::NextItem(Some(item))
},
BridgeStatus::NextItem(ref mut v) => match v.take() {
None => BridgeStatus::Wait,
some => return Ok(some.into()),
},
None => BridgeStatus::Init,
some => {
return Ok(some.into());
}
}
};
self.state = next_state;
@ -254,28 +223,43 @@ mod tests {
extern crate tempdir;
use self::tempdir::TempDir;
use database::Database;
use super::{BridgeBackend, FileBackend, BridgeChecked};
use super::{Bridge, BridgeChecked};
use error::Error;
use tokio_core::reactor::Core;
use futures::{Stream, stream};
#[test]
fn test_file_backend() {
fn test_database_updates() {
let tempdir = TempDir::new("test_file_backend").unwrap();
let mut path = tempdir.path().to_owned();
path.push("db");
let mut backend = FileBackend {
let bridge = Bridge {
path: path.clone(),
database: Database::default(),
event_stream: stream::iter_ok::<_, Error>(vec![BridgeChecked::DepositRelay(1)]),
};
backend.save(vec![BridgeChecked::DepositRelay(1)]).unwrap();
assert_eq!(1, backend.database.checked_deposit_relay);
assert_eq!(0, backend.database.checked_withdraw_confirm);
assert_eq!(0, backend.database.checked_withdraw_relay);
backend.save(vec![BridgeChecked::DepositRelay(2), BridgeChecked::WithdrawConfirm(3), BridgeChecked::WithdrawRelay(2)]).unwrap();
assert_eq!(2, backend.database.checked_deposit_relay);
assert_eq!(3, backend.database.checked_withdraw_confirm);
assert_eq!(2, backend.database.checked_withdraw_relay);
let mut event_loop = Core::new().unwrap();
let _ = event_loop.run(bridge.collect());
let loaded = Database::load(path).unwrap();
assert_eq!(backend.database, loaded);
let db = Database::load(&path).unwrap();
assert_eq!(1, db.checked_deposit_relay);
assert_eq!(0, db.checked_withdraw_confirm);
assert_eq!(0, db.checked_withdraw_relay);
let bridge = Bridge {
path: path.clone(),
database: Database::default(),
event_stream: stream::iter_ok::<_, Error>(vec![BridgeChecked::DepositRelay(2), BridgeChecked::WithdrawConfirm(3), BridgeChecked::WithdrawRelay(2)]),
};
let mut event_loop = Core::new().unwrap();
let _ = event_loop.run(bridge.collect());
let db = Database::load(&path).unwrap();
assert_eq!(2, db.checked_deposit_relay);
assert_eq!(3, db.checked_withdraw_confirm);
assert_eq!(2, db.checked_withdraw_relay);
}
}

View File

@ -13,6 +13,7 @@ use message_to_mainnet::{MessageToMainnet, MESSAGE_LENGTH};
use ethcore_transaction::{Transaction, Action};
use itertools::Itertools;
use super::nonce::{NonceCheck, SendRawTransaction};
use super::BridgeChecked;
fn withdraws_filter(foreign: &foreign::ForeignBridge, address: Address) -> FilterBuilder {
let filter = foreign.events().withdraw().create_filter();
@ -68,7 +69,7 @@ pub struct WithdrawConfirm<T: Transport> {
}
impl<T: Transport> Stream for WithdrawConfirm<T> {
type Item = u64;
type Item = BridgeChecked;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -153,7 +154,7 @@ impl<T: Transport> Stream for WithdrawConfirm<T> {
info!("waiting for new withdraws that should get signed");
WithdrawConfirmState::Wait
},
some => return Ok(some.into()),
Some(v) => return Ok(Some(BridgeChecked::WithdrawConfirm(v)).into()),
}
};
self.state = next_state;

View File

@ -15,6 +15,7 @@ use message_to_mainnet::MessageToMainnet;
use signature::Signature;
use ethcore_transaction::{Transaction, Action};
use super::nonce::{NonceCheck, SendRawTransaction};
use super::BridgeChecked;
use itertools::Itertools;
/// returns a filter for `ForeignBridge.CollectedSignatures` events
@ -109,7 +110,7 @@ pub struct WithdrawRelay<T: Transport> {
}
impl<T: Transport> Stream for WithdrawRelay<T> {
type Item = u64;
type Item = BridgeChecked;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@ -250,7 +251,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
info!("waiting for signed withdraws to relay");
WithdrawRelayState::Wait
},
some => return Ok(some.into()),
Some(v) => return Ok(Some(BridgeChecked::WithdrawRelay(v)).into()),
}
};
self.state = next_state;