Client side mempool disconnect when active account changed

This commit is contained in:
Hanh 2022-11-21 17:23:09 +08:00
parent 65f94e5878
commit be75cd3909
1 changed files with 73 additions and 32 deletions

View File

@ -33,8 +33,8 @@ pub enum MemPoolMsg {
} }
struct MemPoolHandler { struct MemPoolHandler {
coin: u8, pub coin: u8,
id_account: u32, pub id_account: u32,
tx_mesg: Sender<MemPoolMsg>, tx_mesg: Sender<MemPoolMsg>,
} }
@ -47,10 +47,28 @@ impl MemPoolHandler {
} }
} }
pub async fn subscribe(&self) -> anyhow::Result<()> { pub fn run(handler: MemPoolHandler, mut shutdown: tokio::sync::broadcast::Receiver<()>) -> anyhow::Result<()> {
tokio::spawn(async move {
let r = tokio::select! {
res = handler.event_loop() => {
res
}
_ = shutdown.recv() => {
log::info!("Closing mempool stream connection for {} {}", handler.coin, handler.id_account);
Ok(())
}
};
log::info!("MemPoolHandler ended {} {}", handler.coin, handler.id_account);
r
});
Ok(())
}
pub async fn event_loop(&self) -> anyhow::Result<()> {
let tx_mesg = self.tx_mesg.clone(); let tx_mesg = self.tx_mesg.clone();
let coin = self.coin; let coin = self.coin;
let id_account = self.id_account; let id_account = self.id_account;
log::info!("Start sub for {} {}", coin, id_account);
let c = CoinConfig::get(self.coin); let c = CoinConfig::get(self.coin);
let mut client = c.connect_lwd().await?; let mut client = c.connect_lwd().await?;
let (nfs, sapling_ivk, orchard_ivk) = { let (nfs, sapling_ivk, orchard_ivk) = {
@ -75,16 +93,13 @@ impl MemPoolHandler {
.get_mempool_stream(Request::new(Empty {})) .get_mempool_stream(Request::new(Empty {}))
.await? .await?
.into_inner(); .into_inner();
tokio::spawn(async move { while let Some(raw_tx) = mempool_stream.message().await? {
while let Some(raw_tx) = mempool_stream.message().await? { let balance = mempool_impl.scan_transaction(&raw_tx)?;
let balance = mempool_impl.scan_transaction(&raw_tx)?; let _ = tx_mesg
let _ = tx_mesg .send(MemPoolMsg::Balance(coin, id_account, balance))
.send(MemPoolMsg::Balance(coin, id_account, balance)) .await;
.await; }
} let _ = tx_mesg.send(MemPoolMsg::Close(coin, id_account)).await;
let _ = tx_mesg.send(MemPoolMsg::Close(coin, id_account)).await;
Ok::<_, anyhow::Error>(())
});
Ok(()) Ok(())
} }
} }
@ -127,44 +142,70 @@ impl MemPoolRunner {
} }
} }
struct ActiveSub {
coin: u8,
account: u32,
tx_shutdown: tokio::sync::broadcast::Sender<()>,
}
pub async fn run_mempool_loop<F: Fn(i64) + Send + Sync + 'static>( pub async fn run_mempool_loop<F: Fn(i64) + Send + Sync + 'static>(
tx_mesg: Sender<MemPoolMsg>, tx_mesg: Sender<MemPoolMsg>,
mut rx_mesg: Receiver<MemPoolMsg>, mut rx_mesg: Receiver<MemPoolMsg>,
f: F, f: F,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
log::info!("MEMPOOL run"); log::info!("MEMPOOL run");
let mut active_coin = 0; let mut active_sub: Option<ActiveSub> = None;
let mut active_account = 0;
let mut subscribed = false;
while let Some(message) = rx_mesg.recv().await { while let Some(message) = rx_mesg.recv().await {
log::info!("{:?}", message);
match message { match message {
MemPoolMsg::Active(coin, id_account) => { MemPoolMsg::Active(coin, id_account) => {
if coin != active_coin || id_account != active_account { match active_sub.take() {
active_coin = coin; Some(ActiveSub {coin: active_coin, account: active_account, tx_shutdown}) => {
active_account = id_account; if coin != active_coin || id_account != active_account {
subscribed = false; tx_shutdown.send(())?; // Close current connection
let _ = tx_mesg.send(MemPoolMsg::Subscribe(active_coin, active_account)).await; let _ = tx_mesg.send(MemPoolMsg::Subscribe(coin, id_account)).await;
}
else { // same active account, just put it back
active_sub = Some(ActiveSub {
coin: active_coin,
account: active_account,
tx_shutdown
});
}
}
None => {
let _ = tx_mesg.send(MemPoolMsg::Subscribe(coin, id_account)).await;
}
} }
} }
MemPoolMsg::Subscribe(coin, id_account) => { MemPoolMsg::Subscribe(coin, id_account) => {
if !subscribed { if active_sub.is_none() {
let mempool_handler = MemPoolHandler::new(coin, id_account, tx_mesg.clone()); let mempool_handler = MemPoolHandler::new(coin, id_account, tx_mesg.clone());
mempool_handler.subscribe().await?; let (tx_shutdown, rx_shutdown) = tokio::sync::broadcast::channel::<()>(1);
subscribed = true; active_sub = Some(ActiveSub {
coin,
account: id_account,
tx_shutdown
});
MemPoolHandler::run(mempool_handler, rx_shutdown)?;
} }
} }
MemPoolMsg::Balance(coin, id_account, balance) => { MemPoolMsg::Balance(coin, id_account, balance) => {
if coin == active_coin && id_account == active_account { if let Some(ActiveSub { coin: active_coin, account: active_account, .. }) = active_sub.as_ref() {
f(balance); if coin == *active_coin && id_account == *active_account {
f(balance);
}
} }
} }
MemPoolMsg::Close(coin, id_account) => { MemPoolMsg::Close(coin, id_account) => {
if coin == active_coin && id_account == active_account { let active = active_sub.take();
subscribed = false; if let Some(ActiveSub { coin: active_coin, account: active_account , .. }) = active {
let _ = tx_mesg if coin == active_coin && id_account == active_account {
.send(MemPoolMsg::Subscribe(active_coin, active_account)) f(0);
.await; let _ = tx_mesg
f(0); .send(MemPoolMsg::Subscribe(active_coin, active_account))
.await;
}
} }
} }
} }