Merge pull request #95 from yrashk/mod-refactor
Problem: potential loss of database updates
This commit is contained in:
commit
9b131a2385
|
@ -11,6 +11,7 @@ use util::web3_filter;
|
|||
use app::App;
|
||||
use ethcore_transaction::{Transaction, Action};
|
||||
use super::nonce::{NonceCheck, SendRawTransaction};
|
||||
use super::BridgeChecked;
|
||||
use itertools::Itertools;
|
||||
|
||||
fn deposits_filter(home: &home::HomeBridge, address: Address) -> FilterBuilder {
|
||||
|
@ -72,7 +73,7 @@ pub struct DepositRelay<T: Transport> {
|
|||
}
|
||||
|
||||
impl<T: Transport> Stream for DepositRelay<T> {
|
||||
type Item = u64;
|
||||
type Item = BridgeChecked;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
@ -126,7 +127,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
|
|||
},
|
||||
DepositRelayState::Yield(ref mut block) => match block.take() {
|
||||
None => DepositRelayState::Wait,
|
||||
some => return Ok(some.into()),
|
||||
Some(v) => return Ok(Some(BridgeChecked::DepositRelay(v)).into()),
|
||||
}
|
||||
};
|
||||
self.state = next_state;
|
||||
|
|
|
@ -15,7 +15,7 @@ use web3::Transport;
|
|||
use web3::types::U256;
|
||||
use app::App;
|
||||
use database::Database;
|
||||
use error::{Error, ErrorKind, Result};
|
||||
use error::{Error, ErrorKind};
|
||||
use tokio_core::reactor::Handle;
|
||||
|
||||
pub use self::deploy::{Deploy, Deployed, create_deploy};
|
||||
|
@ -34,57 +34,51 @@ pub enum BridgeChecked {
|
|||
WithdrawConfirm(u64),
|
||||
}
|
||||
|
||||
pub trait BridgeBackend {
|
||||
fn save(&mut self, checks: Vec<BridgeChecked>) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct FileBackend {
|
||||
pub struct Bridge<ES: Stream<Item = BridgeChecked>> {
|
||||
path: PathBuf,
|
||||
database: Database,
|
||||
event_stream: ES,
|
||||
}
|
||||
|
||||
impl BridgeBackend for FileBackend {
|
||||
fn save(&mut self, checks: Vec<BridgeChecked>) -> Result<()> {
|
||||
for check in checks {
|
||||
match check {
|
||||
BridgeChecked::DepositRelay(n) => {
|
||||
self.database.checked_deposit_relay = n;
|
||||
},
|
||||
BridgeChecked::WithdrawRelay(n) => {
|
||||
self.database.checked_withdraw_relay = n;
|
||||
},
|
||||
BridgeChecked::WithdrawConfirm(n) => {
|
||||
self.database.checked_withdraw_confirm = n;
|
||||
},
|
||||
}
|
||||
}
|
||||
impl<ES: Stream<Item = BridgeChecked, Error = Error>> Stream for Bridge<ES> {
|
||||
type Item = ();
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
let check = try_stream!(self.event_stream.poll());
|
||||
match check {
|
||||
BridgeChecked::DepositRelay(n) => {
|
||||
self.database.checked_deposit_relay = n;
|
||||
},
|
||||
BridgeChecked::WithdrawRelay(n) => {
|
||||
self.database.checked_withdraw_relay = n;
|
||||
},
|
||||
BridgeChecked::WithdrawConfirm(n) => {
|
||||
self.database.checked_withdraw_confirm = n;
|
||||
},
|
||||
}
|
||||
let file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&self.path)?;
|
||||
|
||||
self.database.save(file)
|
||||
self.database.save(file)?;
|
||||
Ok(Async::Ready(Some(())))
|
||||
}
|
||||
}
|
||||
|
||||
enum BridgeStatus {
|
||||
Wait,
|
||||
NextItem(Option<()>),
|
||||
}
|
||||
|
||||
|
||||
/// Creates new bridge.
|
||||
pub fn create_bridge<T: Transport + Clone>(app: Arc<App<T>>, init: &Database, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<T, FileBackend> {
|
||||
let backend = FileBackend {
|
||||
pub fn create_bridge<'a, T: Transport + 'a + Clone>(app: Arc<App<T>>, init: &Database, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<BridgeEventStream<'a, T>> {
|
||||
Bridge {
|
||||
path: app.database_path.clone(),
|
||||
database: init.clone(),
|
||||
};
|
||||
|
||||
create_bridge_backed_by(app, init, backend, handle, home_chain_id, foreign_chain_id)
|
||||
event_stream: create_bridge_event_stream(app, init, handle, home_chain_id, foreign_chain_id),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates new bridge writing to custom backend.
|
||||
pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<App<T>>, init: &Database, backend: F, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> Bridge<T, F> {
|
||||
pub fn create_bridge_event_stream<'a, T: Transport + 'a + Clone>(app: Arc<App<T>>, init: &Database, handle: &Handle, home_chain_id: u64, foreign_chain_id: u64) -> BridgeEventStream<'a, T> {
|
||||
let home_balance = Arc::new(RwLock::new(None));
|
||||
let foreign_balance = Arc::new(RwLock::new(None));
|
||||
|
||||
|
@ -105,16 +99,22 @@ pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<
|
|||
let home_gas_price = Arc::new(RwLock::new(app.config.home.default_gas_price));
|
||||
let foreign_gas_price = Arc::new(RwLock::new(app.config.foreign.default_gas_price));
|
||||
|
||||
Bridge {
|
||||
let deposit_relay = create_deposit_relay(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone())
|
||||
.map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "deposit_relay").into());
|
||||
let withdraw_relay = create_withdraw_relay(app.clone(), init, home_balance.clone(), home_chain_id, home_gas_price.clone())
|
||||
.map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_relay").into());
|
||||
let withdraw_confirm = create_withdraw_confirm(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone())
|
||||
.map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_confirm").into());
|
||||
|
||||
let bridge = Box::new(deposit_relay.select(withdraw_relay).select(withdraw_confirm));
|
||||
|
||||
BridgeEventStream {
|
||||
foreign_balance_check: create_balance_check(app.clone(), app.connections.foreign.clone(), app.config.foreign.clone()),
|
||||
home_balance_check: create_balance_check(app.clone(), app.connections.home.clone(), app.config.home.clone()),
|
||||
foreign_balance: foreign_balance.clone(),
|
||||
home_balance: home_balance.clone(),
|
||||
deposit_relay: create_deposit_relay(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone()),
|
||||
withdraw_relay: create_withdraw_relay(app.clone(), init, home_balance.clone(), home_chain_id, home_gas_price.clone()),
|
||||
withdraw_confirm: create_withdraw_confirm(app.clone(), init, foreign_balance.clone(), foreign_chain_id, foreign_gas_price.clone()),
|
||||
state: BridgeStatus::Wait,
|
||||
backend,
|
||||
bridge,
|
||||
state: BridgeStatus::Init,
|
||||
running: app.running.clone(),
|
||||
home_gas_stream,
|
||||
foreign_gas_stream,
|
||||
|
@ -123,16 +123,19 @@ pub fn create_bridge_backed_by<T: Transport + Clone, F: BridgeBackend>(app: Arc<
|
|||
}
|
||||
}
|
||||
|
||||
pub struct Bridge<T: Transport, F> {
|
||||
enum BridgeStatus {
|
||||
Init,
|
||||
Wait,
|
||||
NextItem(Option<BridgeChecked>),
|
||||
}
|
||||
|
||||
pub struct BridgeEventStream<'a, T: Transport + 'a> {
|
||||
home_balance_check: BalanceCheck<T>,
|
||||
foreign_balance_check: BalanceCheck<T>,
|
||||
home_balance: Arc<RwLock<Option<U256>>>,
|
||||
foreign_balance: Arc<RwLock<Option<U256>>>,
|
||||
deposit_relay: DepositRelay<T>,
|
||||
withdraw_relay: WithdrawRelay<T>,
|
||||
withdraw_confirm: WithdrawConfirm<T>,
|
||||
bridge: Box<Stream<Item = BridgeChecked, Error = Error> + 'a>,
|
||||
state: BridgeStatus,
|
||||
backend: F,
|
||||
running: Arc<AtomicBool>,
|
||||
home_gas_stream: Option<StandardGasPriceStream>,
|
||||
foreign_gas_stream: Option<StandardGasPriceStream>,
|
||||
|
@ -142,7 +145,7 @@ pub struct Bridge<T: Transport, F> {
|
|||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
impl<T: Transport, F: BridgeBackend> Bridge<T, F> {
|
||||
impl<'a, T: Transport + 'a> BridgeEventStream<'a, T> {
|
||||
fn check_balances(&mut self) -> Poll<Option<()>, Error> {
|
||||
let mut home_balance = self.home_balance.write().unwrap();
|
||||
let mut foreign_balance = self.foreign_balance.write().unwrap();
|
||||
|
@ -178,70 +181,36 @@ impl<T: Transport, F: BridgeBackend> Bridge<T, F> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Transport, F: BridgeBackend> Stream for Bridge<T, F> {
|
||||
type Item = ();
|
||||
impl<'a, T: Transport + 'a> Stream for BridgeEventStream<'a, T> {
|
||||
type Item = BridgeChecked;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
let next_state = match self.state {
|
||||
BridgeStatus::Init => {
|
||||
match self.check_balances()? {
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
_ => (),
|
||||
}
|
||||
BridgeStatus::Wait
|
||||
},
|
||||
BridgeStatus::Wait => {
|
||||
if !self.running.load(Ordering::SeqCst) {
|
||||
return Err(ErrorKind::ShutdownRequested.into())
|
||||
}
|
||||
|
||||
// Intended to be used upon startup
|
||||
let balance_is_absent = {
|
||||
let mut home_balance = self.home_balance.read().unwrap();
|
||||
let mut foreign_balance = self.foreign_balance.read().unwrap();
|
||||
home_balance.is_none() || foreign_balance.is_none()
|
||||
};
|
||||
if balance_is_absent {
|
||||
match self.check_balances()? {
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
let _ = self.get_gas_prices();
|
||||
|
||||
let d_relay = try_bridge!(self.deposit_relay.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "deposit_relay")))
|
||||
.map(BridgeChecked::DepositRelay);
|
||||
|
||||
if d_relay.is_some() {
|
||||
self.check_balances()?;
|
||||
}
|
||||
|
||||
let w_relay = try_bridge!(self.withdraw_relay.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_relay"))).
|
||||
map(BridgeChecked::WithdrawRelay);
|
||||
|
||||
if w_relay.is_some() {
|
||||
self.check_balances()?;
|
||||
}
|
||||
|
||||
let w_confirm = try_bridge!(self.withdraw_confirm.poll().map_err(|e| ErrorKind::ContextualizedError(Box::new(e), "withdraw_confirm"))).
|
||||
map(BridgeChecked::WithdrawConfirm);
|
||||
|
||||
if w_confirm.is_some() {
|
||||
self.check_balances()?;
|
||||
}
|
||||
|
||||
let result: Vec<_> = [d_relay, w_relay, w_confirm]
|
||||
.into_iter()
|
||||
.filter_map(|c| *c)
|
||||
.collect();
|
||||
|
||||
if result.is_empty() {
|
||||
return Ok(Async::NotReady);
|
||||
} else {
|
||||
self.backend.save(result)?;
|
||||
BridgeStatus::NextItem(Some(()))
|
||||
}
|
||||
let item = try_stream!(self.bridge.poll());
|
||||
BridgeStatus::NextItem(Some(item))
|
||||
},
|
||||
BridgeStatus::NextItem(ref mut v) => match v.take() {
|
||||
None => BridgeStatus::Wait,
|
||||
some => return Ok(some.into()),
|
||||
},
|
||||
None => BridgeStatus::Init,
|
||||
some => {
|
||||
return Ok(some.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.state = next_state;
|
||||
|
@ -254,28 +223,43 @@ mod tests {
|
|||
extern crate tempdir;
|
||||
use self::tempdir::TempDir;
|
||||
use database::Database;
|
||||
use super::{BridgeBackend, FileBackend, BridgeChecked};
|
||||
use super::{Bridge, BridgeChecked};
|
||||
use error::Error;
|
||||
use tokio_core::reactor::Core;
|
||||
use futures::{Stream, stream};
|
||||
|
||||
#[test]
|
||||
fn test_file_backend() {
|
||||
fn test_database_updates() {
|
||||
let tempdir = TempDir::new("test_file_backend").unwrap();
|
||||
let mut path = tempdir.path().to_owned();
|
||||
path.push("db");
|
||||
let mut backend = FileBackend {
|
||||
|
||||
let bridge = Bridge {
|
||||
path: path.clone(),
|
||||
database: Database::default(),
|
||||
event_stream: stream::iter_ok::<_, Error>(vec![BridgeChecked::DepositRelay(1)]),
|
||||
};
|
||||
|
||||
backend.save(vec![BridgeChecked::DepositRelay(1)]).unwrap();
|
||||
assert_eq!(1, backend.database.checked_deposit_relay);
|
||||
assert_eq!(0, backend.database.checked_withdraw_confirm);
|
||||
assert_eq!(0, backend.database.checked_withdraw_relay);
|
||||
backend.save(vec![BridgeChecked::DepositRelay(2), BridgeChecked::WithdrawConfirm(3), BridgeChecked::WithdrawRelay(2)]).unwrap();
|
||||
assert_eq!(2, backend.database.checked_deposit_relay);
|
||||
assert_eq!(3, backend.database.checked_withdraw_confirm);
|
||||
assert_eq!(2, backend.database.checked_withdraw_relay);
|
||||
let mut event_loop = Core::new().unwrap();
|
||||
let _ = event_loop.run(bridge.collect());
|
||||
|
||||
let loaded = Database::load(path).unwrap();
|
||||
assert_eq!(backend.database, loaded);
|
||||
let db = Database::load(&path).unwrap();
|
||||
assert_eq!(1, db.checked_deposit_relay);
|
||||
assert_eq!(0, db.checked_withdraw_confirm);
|
||||
assert_eq!(0, db.checked_withdraw_relay);
|
||||
|
||||
let bridge = Bridge {
|
||||
path: path.clone(),
|
||||
database: Database::default(),
|
||||
event_stream: stream::iter_ok::<_, Error>(vec![BridgeChecked::DepositRelay(2), BridgeChecked::WithdrawConfirm(3), BridgeChecked::WithdrawRelay(2)]),
|
||||
};
|
||||
|
||||
let mut event_loop = Core::new().unwrap();
|
||||
let _ = event_loop.run(bridge.collect());
|
||||
|
||||
let db = Database::load(&path).unwrap();
|
||||
assert_eq!(2, db.checked_deposit_relay);
|
||||
assert_eq!(3, db.checked_withdraw_confirm);
|
||||
assert_eq!(2, db.checked_withdraw_relay);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ use message_to_mainnet::{MessageToMainnet, MESSAGE_LENGTH};
|
|||
use ethcore_transaction::{Transaction, Action};
|
||||
use itertools::Itertools;
|
||||
use super::nonce::{NonceCheck, SendRawTransaction};
|
||||
use super::BridgeChecked;
|
||||
|
||||
fn withdraws_filter(foreign: &foreign::ForeignBridge, address: Address) -> FilterBuilder {
|
||||
let filter = foreign.events().withdraw().create_filter();
|
||||
|
@ -68,7 +69,7 @@ pub struct WithdrawConfirm<T: Transport> {
|
|||
}
|
||||
|
||||
impl<T: Transport> Stream for WithdrawConfirm<T> {
|
||||
type Item = u64;
|
||||
type Item = BridgeChecked;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
@ -153,7 +154,7 @@ impl<T: Transport> Stream for WithdrawConfirm<T> {
|
|||
info!("waiting for new withdraws that should get signed");
|
||||
WithdrawConfirmState::Wait
|
||||
},
|
||||
some => return Ok(some.into()),
|
||||
Some(v) => return Ok(Some(BridgeChecked::WithdrawConfirm(v)).into()),
|
||||
}
|
||||
};
|
||||
self.state = next_state;
|
||||
|
|
|
@ -15,6 +15,7 @@ use message_to_mainnet::MessageToMainnet;
|
|||
use signature::Signature;
|
||||
use ethcore_transaction::{Transaction, Action};
|
||||
use super::nonce::{NonceCheck, SendRawTransaction};
|
||||
use super::BridgeChecked;
|
||||
use itertools::Itertools;
|
||||
|
||||
/// returns a filter for `ForeignBridge.CollectedSignatures` events
|
||||
|
@ -109,7 +110,7 @@ pub struct WithdrawRelay<T: Transport> {
|
|||
}
|
||||
|
||||
impl<T: Transport> Stream for WithdrawRelay<T> {
|
||||
type Item = u64;
|
||||
type Item = BridgeChecked;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
@ -250,7 +251,7 @@ impl<T: Transport> Stream for WithdrawRelay<T> {
|
|||
info!("waiting for signed withdraws to relay");
|
||||
WithdrawRelayState::Wait
|
||||
},
|
||||
some => return Ok(some.into()),
|
||||
Some(v) => return Ok(Some(BridgeChecked::WithdrawRelay(v)).into()),
|
||||
}
|
||||
};
|
||||
self.state = next_state;
|
||||
|
|
Loading…
Reference in New Issue