some docs
This commit is contained in:
parent
2059901b4a
commit
55b3599984
45
src/api.rs
45
src/api.rs
|
@ -1,4 +1,3 @@
|
|||
use std::vec;
|
||||
use std::time::Duration;
|
||||
use futures::{Future, Stream, Poll, Async};
|
||||
use futures_after::{After, AfterStream};
|
||||
|
@ -9,55 +8,80 @@ use web3::types::{Log, Filter, H256, Block, BlockId, BlockNumber, U256, FilterBu
|
|||
use web3::helpers::CallResult;
|
||||
use error::{Error, ErrorKind};
|
||||
|
||||
/// Imperative alias for web3 function.
|
||||
pub use web3::confirm::send_transaction_with_confirmation;
|
||||
|
||||
/// Imperative wrapper for web3 function.
|
||||
pub fn logs<T: Transport>(transport: T, filter: &Filter) -> CallResult<Vec<Log>, T::Out> {
|
||||
api::Eth::new(transport).logs(filter)
|
||||
}
|
||||
|
||||
/// Imperative wrapper for web3 function.
|
||||
pub fn block<T: Transport>(transport: T, id: BlockId) -> CallResult<Block<H256>, T::Out> {
|
||||
api::Eth::new(transport).block(id)
|
||||
}
|
||||
|
||||
/// Imperative wrapper for web3 function.
|
||||
pub fn block_number<T: Transport>(transport: T) -> CallResult<U256, T::Out> {
|
||||
api::Eth::new(transport).block_number()
|
||||
}
|
||||
|
||||
/// Imperative wrapper for web3 function.
|
||||
pub fn send_transaction<T: Transport>(transport: T, tx: TransactionRequest) -> CallResult<H256, T::Out> {
|
||||
api::Eth::new(transport).send_transaction(tx)
|
||||
}
|
||||
|
||||
/// Used for `LogStream` initialization.
|
||||
pub struct LogStreamInit {
|
||||
pub after: u64,
|
||||
pub filter: FilterBuilder,
|
||||
pub poll_interval: Duration,
|
||||
pub confirmations: usize,
|
||||
pub confirmations: u64,
|
||||
}
|
||||
|
||||
/// Contains all logs matching `LogStream` filter in inclusive range `[from, to]`.
|
||||
pub struct LogStreamItem {
|
||||
pub from: u64,
|
||||
pub to: u64,
|
||||
pub logs: Vec<Log>,
|
||||
}
|
||||
|
||||
pub enum LogStreamState<T: Transport> {
|
||||
/// Log Stream state.
|
||||
enum LogStreamState<T: Transport> {
|
||||
/// Log Stream is waiting for timer to poll.
|
||||
Wait,
|
||||
/// Fetching best block.
|
||||
FetchBlockNumber(CallResult<U256, T::Out>),
|
||||
/// Fetching logs for new best block.
|
||||
FetchLogs {
|
||||
from: u64,
|
||||
to: u64,
|
||||
future: CallResult<Vec<Log>, T::Out>,
|
||||
},
|
||||
/// All logs has been fetched.
|
||||
NextItem(Option<LogStreamItem>),
|
||||
}
|
||||
|
||||
/// Creates new `LogStream`.
|
||||
pub fn log_stream<T: Transport>(transport: T, init: LogStreamInit) -> LogStream<T> {
|
||||
LogStream {
|
||||
transport,
|
||||
interval: Timer::default().interval(init.poll_interval),
|
||||
state: LogStreamState::Wait,
|
||||
after: init.after,
|
||||
filter: init.filter,
|
||||
confirmations: init.confirmations,
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream of confirmed logs.
|
||||
pub struct LogStream<T: Transport> {
|
||||
transport: T,
|
||||
interval: Interval,
|
||||
state: LogStreamState<T>,
|
||||
after: u64,
|
||||
filter: FilterBuilder,
|
||||
confirmations: usize,
|
||||
confirmations: u64,
|
||||
}
|
||||
|
||||
impl<T: Transport> Stream for LogStream<T> {
|
||||
|
@ -73,7 +97,7 @@ impl<T: Transport> Stream for LogStream<T> {
|
|||
},
|
||||
LogStreamState::FetchBlockNumber(ref mut future) => {
|
||||
let last_block = try_ready!(future.poll().map_err(ErrorKind::Web3)).low_u64();
|
||||
let last_confirmed_block = last_block.saturating_sub(self.confirmations as u64);
|
||||
let last_confirmed_block = last_block.saturating_sub(self.confirmations);
|
||||
if last_confirmed_block > self.after {
|
||||
let from = self.after + 1;
|
||||
let filter = self.filter.clone()
|
||||
|
@ -110,14 +134,3 @@ impl<T: Transport> Stream for LogStream<T> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn log_stream<T: Transport>(transport: T, init: LogStreamInit) -> LogStream<T> {
|
||||
LogStream {
|
||||
transport,
|
||||
interval: Timer::default().interval(init.poll_interval),
|
||||
state: LogStreamState::Wait,
|
||||
after: init.after,
|
||||
filter: init.filter,
|
||||
confirmations: init.confirmations,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,26 +3,45 @@ use futures::{Future, Stream, Poll, Async};
|
|||
use futures::future::{JoinAll, join_all};
|
||||
use web3::Transport;
|
||||
use web3::helpers::CallResult;
|
||||
use web3::types::{TransactionRequest, H256, Log};
|
||||
use web3::types::{TransactionRequest, H256, Log, Address};
|
||||
use api::{LogStream, self};
|
||||
use error::{Error, ErrorKind};
|
||||
use database::Database;
|
||||
use app::App;
|
||||
|
||||
pub enum DepositRelayState<T: Transport> {
|
||||
/// State of deposits relay.
|
||||
enum DepositRelayState<T: Transport> {
|
||||
/// Deposit relay is waiting for logs.
|
||||
Wait,
|
||||
/// Relaying deposits in progress.
|
||||
RelayDeposits {
|
||||
future: JoinAll<Vec<CallResult<H256, T::Out>>>,
|
||||
block: u64,
|
||||
},
|
||||
/// All deposits from given block has been relayed.
|
||||
Yield(Option<u64>),
|
||||
}
|
||||
|
||||
fn deposit_relay<T: Transport + Clone>(app: Arc<App<T>>, init: &Database) -> DepositRelay<T> {
|
||||
let logs_init = api::LogStreamInit {
|
||||
after: init.checked_deposit_relay,
|
||||
poll_interval: app.config.mainnet.poll_interval,
|
||||
confirmations: app.config.mainnet.required_confirmations,
|
||||
filter: app.mainnet_bridge().deposits_filter(init.mainnet_contract_address.clone()),
|
||||
};
|
||||
DepositRelay {
|
||||
logs: api::log_stream(app.connections.mainnet.clone(), logs_init),
|
||||
testnet_contract: init.testnet_contract_address.clone(),
|
||||
state: DepositRelayState::Wait,
|
||||
app,
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DepositRelay<T: Transport> {
|
||||
app: Arc<App<T>>,
|
||||
logs: LogStream<T>,
|
||||
state: DepositRelayState<T>,
|
||||
init: Database,
|
||||
testnet_contract: Address,
|
||||
}
|
||||
|
||||
impl<T: Transport> Stream for DepositRelay<T> {
|
||||
|
@ -42,7 +61,7 @@ impl<T: Transport> Stream for DepositRelay<T> {
|
|||
.map(|deposit| self.app.testnet_bridge().deposit_payload(deposit))
|
||||
.map(|payload| TransactionRequest {
|
||||
from: self.app.config.testnet.account.clone(),
|
||||
to: Some(self.init.testnet_contract_address.clone()),
|
||||
to: Some(self.testnet_contract.clone()),
|
||||
gas: Some(self.app.config.testnet.txs.deposit.gas.into()),
|
||||
gas_price: Some(self.app.config.testnet.txs.deposit.gas_price.into()),
|
||||
value: Some(self.app.config.testnet.txs.deposit.value.into()),
|
||||
|
|
|
@ -2,7 +2,6 @@ mod deposit_relay;
|
|||
mod withdraw_confirm;
|
||||
mod withdraw_relay;
|
||||
|
||||
use std::vec;
|
||||
use futures::{Stream, Poll, Async};
|
||||
use web3::Transport;
|
||||
use error::Error;
|
||||
|
@ -19,7 +18,7 @@ pub enum BridgeChecked {
|
|||
|
||||
enum BridgeStatus {
|
||||
Wait,
|
||||
NextItem(vec::IntoIter<BridgeChecked>),
|
||||
NextItem(Option<Vec<BridgeChecked>>),
|
||||
}
|
||||
|
||||
pub struct Bridge<T: Transport> {
|
||||
|
@ -30,7 +29,7 @@ pub struct Bridge<T: Transport> {
|
|||
}
|
||||
|
||||
impl<T: Transport> Stream for Bridge<T> {
|
||||
type Item = BridgeChecked;
|
||||
type Item = Vec<BridgeChecked>;
|
||||
type Error = Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
@ -41,14 +40,13 @@ impl<T: Transport> Stream for Bridge<T> {
|
|||
let w_relay = try_channel!(self.withdraw_relay.poll()).map(BridgeChecked::WithdrawRelay);
|
||||
let w_confirm = try_channel!(self.withdraw_confirm.poll()).map(BridgeChecked::WithdrawConfirm);
|
||||
|
||||
let result: Vec<_> = [d_relay, w_relay, w_confirm]
|
||||
let result = [d_relay, w_relay, w_confirm]
|
||||
.into_iter()
|
||||
.filter_map(|c| *c)
|
||||
.collect();
|
||||
BridgeStatus::NextItem(result.into_iter())
|
||||
|
||||
BridgeStatus::NextItem(Some(result))
|
||||
},
|
||||
BridgeStatus::NextItem(ref mut iter) => match iter.next() {
|
||||
BridgeStatus::NextItem(ref mut v) => match v.take() {
|
||||
None => BridgeStatus::Wait,
|
||||
some => return Ok(some.into()),
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue