poa-bridge/src/api.rs

124 lines
3.3 KiB
Rust
Raw Normal View History

2017-08-11 04:13:27 -07:00
use std::vec;
use std::time::Duration;
use futures::{Future, Stream, Poll};
use futures_after::{After, AfterStream};
2017-08-12 01:55:18 -07:00
use tokio_timer::{Timer, Interval};
2017-08-11 04:13:27 -07:00
use web3::{self, api, Transport};
use web3::api::{Namespace, FilterStream, CreateFilter};
2017-08-12 07:57:07 -07:00
use web3::types::{Log, Filter, H256, Block, BlockId, BlockNumber, U256, FilterBuilder, TransactionRequest};
2017-08-11 04:13:27 -07:00
use web3::helpers::CallResult;
2017-08-12 01:55:18 -07:00
use error::{Error, ErrorKind};
2017-08-11 04:13:27 -07:00
2017-08-01 02:36:48 -07:00
pub use web3::confirm::send_transaction_with_confirmation;
2017-08-11 04:13:27 -07:00
pub fn logs<T: Transport>(transport: T, filter: &Filter) -> CallResult<Vec<Log>, T::Out> {
api::Eth::new(transport).logs(filter)
}
pub fn block<T: Transport>(transport: T, id: BlockId) -> CallResult<Block<H256>, T::Out> {
api::Eth::new(transport).block(id)
}
2017-08-12 01:55:18 -07:00
pub fn block_number<T: Transport>(transport: T) -> CallResult<U256, T::Out> {
api::Eth::new(transport).block_number()
2017-08-11 04:13:27 -07:00
}
2017-08-12 07:57:07 -07:00
pub fn send_transaction<T: Transport>(transport: T, tx: TransactionRequest) -> CallResult<H256, T::Out> {
api::Eth::new(transport).send_transaction(tx)
}
2017-08-12 01:55:18 -07:00
pub struct LogStreamInit {
pub after: u64,
pub filter: FilterBuilder,
pub poll_interval: Duration,
pub confirmations: usize,
2017-08-11 04:13:27 -07:00
}
2017-08-12 01:55:18 -07:00
pub struct LogStreamItem {
pub from: u64,
pub to: u64,
pub logs: Vec<Log>,
2017-08-11 04:13:27 -07:00
}
2017-08-12 01:55:18 -07:00
pub enum LogStreamState<T: Transport> {
Wait,
FetchBlockNumber(CallResult<U256, T::Out>),
FetchLogs {
from: u64,
to: u64,
future: CallResult<Vec<Log>, T::Out>,
},
NextItem(Option<LogStreamItem>),
2017-08-11 04:13:27 -07:00
}
2017-08-12 01:55:18 -07:00
pub struct LogStream<T: Transport> {
2017-08-11 04:13:27 -07:00
transport: T,
2017-08-12 01:55:18 -07:00
interval: Interval,
state: LogStreamState<T>,
after: u64,
filter: FilterBuilder,
confirmations: usize,
2017-08-11 04:13:27 -07:00
}
2017-08-12 01:55:18 -07:00
impl<T: Transport> Stream for LogStream<T> {
type Item = LogStreamItem;
type Error = Error;
2017-08-11 04:13:27 -07:00
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let next_state = match self.state {
2017-08-12 01:55:18 -07:00
LogStreamState::Wait => match try_ready!(self.interval.poll()) {
2017-08-11 04:13:27 -07:00
None => return Ok(None.into()),
2017-08-12 01:55:18 -07:00
Some(_) => LogStreamState::FetchBlockNumber(block_number(&self.transport)),
2017-08-11 04:13:27 -07:00
},
2017-08-12 01:55:18 -07:00
LogStreamState::FetchBlockNumber(ref mut future) => {
let last_block = try_ready!(future.poll().map_err(ErrorKind::Web3)).low_u64();
let last_confirmed_block = last_block.saturating_sub(self.confirmations as u64);
if last_confirmed_block > self.after {
let from = self.after + 1;
let filter = self.filter.clone()
.from_block(from.into())
.to_block(last_confirmed_block.into())
.build();
LogStreamState::FetchLogs {
from: from,
to: last_confirmed_block,
future: logs(&self.transport, &filter)
}
} else {
LogStreamState::Wait
}
2017-08-11 04:13:27 -07:00
},
2017-08-12 01:55:18 -07:00
LogStreamState::FetchLogs { ref mut future, from, to } => {
let logs = try_ready!(future.poll().map_err(ErrorKind::Web3));
let item = LogStreamItem {
from,
to,
logs,
};
self.after = to;
LogStreamState::NextItem(Some(item))
},
LogStreamState::NextItem(ref mut item) => match item.take() {
2017-08-11 04:13:27 -07:00
some => return Ok(some.into()),
2017-08-12 01:55:18 -07:00
None => LogStreamState::Wait,
2017-08-11 04:13:27 -07:00
},
};
2017-08-12 01:55:18 -07:00
2017-08-11 04:13:27 -07:00
self.state = next_state;
}
}
}
2017-08-12 01:55:18 -07:00
pub fn log_stream<T: Transport>(transport: T, init: LogStreamInit) -> LogStream<T> {
LogStream {
2017-08-11 04:13:27 -07:00
transport,
2017-08-12 01:55:18 -07:00
interval: Timer::default().interval(init.poll_interval),
state: LogStreamState::Wait,
after: init.after,
filter: init.filter,
confirmations: init.confirmations,
2017-08-11 04:13:27 -07:00
}
}