diff --git a/src/mempool.rs b/src/mempool.rs index f6159a1..da43318 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -33,8 +33,8 @@ pub enum MemPoolMsg { } struct MemPoolHandler { - coin: u8, - id_account: u32, + pub coin: u8, + pub id_account: u32, tx_mesg: Sender, } @@ -47,10 +47,28 @@ impl MemPoolHandler { } } - pub async fn subscribe(&self) -> anyhow::Result<()> { + pub fn run(handler: MemPoolHandler, mut shutdown: tokio::sync::broadcast::Receiver<()>) -> anyhow::Result<()> { + tokio::spawn(async move { + let r = tokio::select! { + res = handler.event_loop() => { + res + } + _ = shutdown.recv() => { + log::info!("Closing mempool stream connection for {} {}", handler.coin, handler.id_account); + Ok(()) + } + }; + log::info!("MemPoolHandler ended {} {}", handler.coin, handler.id_account); + r + }); + Ok(()) + } + + pub async fn event_loop(&self) -> anyhow::Result<()> { let tx_mesg = self.tx_mesg.clone(); let coin = self.coin; let id_account = self.id_account; + log::info!("Start sub for {} {}", coin, id_account); let c = CoinConfig::get(self.coin); let mut client = c.connect_lwd().await?; let (nfs, sapling_ivk, orchard_ivk) = { @@ -75,16 +93,13 @@ impl MemPoolHandler { .get_mempool_stream(Request::new(Empty {})) .await? .into_inner(); - tokio::spawn(async move { - while let Some(raw_tx) = mempool_stream.message().await? { - let balance = mempool_impl.scan_transaction(&raw_tx)?; - let _ = tx_mesg - .send(MemPoolMsg::Balance(coin, id_account, balance)) - .await; - } - let _ = tx_mesg.send(MemPoolMsg::Close(coin, id_account)).await; - Ok::<_, anyhow::Error>(()) - }); + while let Some(raw_tx) = mempool_stream.message().await? { + let balance = mempool_impl.scan_transaction(&raw_tx)?; + let _ = tx_mesg + .send(MemPoolMsg::Balance(coin, id_account, balance)) + .await; + } + let _ = tx_mesg.send(MemPoolMsg::Close(coin, id_account)).await; Ok(()) } } @@ -127,44 +142,70 @@ impl MemPoolRunner { } } +struct ActiveSub { + coin: u8, + account: u32, + tx_shutdown: tokio::sync::broadcast::Sender<()>, +} + pub async fn run_mempool_loop( tx_mesg: Sender, mut rx_mesg: Receiver, f: F, ) -> anyhow::Result<()> { log::info!("MEMPOOL run"); - let mut active_coin = 0; - let mut active_account = 0; - let mut subscribed = false; + let mut active_sub: Option = None; while let Some(message) = rx_mesg.recv().await { + log::info!("{:?}", message); match message { MemPoolMsg::Active(coin, id_account) => { - if coin != active_coin || id_account != active_account { - active_coin = coin; - active_account = id_account; - subscribed = false; - let _ = tx_mesg.send(MemPoolMsg::Subscribe(active_coin, active_account)).await; + match active_sub.take() { + Some(ActiveSub {coin: active_coin, account: active_account, tx_shutdown}) => { + if coin != active_coin || id_account != active_account { + tx_shutdown.send(())?; // Close current connection + let _ = tx_mesg.send(MemPoolMsg::Subscribe(coin, id_account)).await; + } + else { // same active account, just put it back + active_sub = Some(ActiveSub { + coin: active_coin, + account: active_account, + tx_shutdown + }); + } + } + None => { + let _ = tx_mesg.send(MemPoolMsg::Subscribe(coin, id_account)).await; + } } } MemPoolMsg::Subscribe(coin, id_account) => { - if !subscribed { + if active_sub.is_none() { let mempool_handler = MemPoolHandler::new(coin, id_account, tx_mesg.clone()); - mempool_handler.subscribe().await?; - subscribed = true; + let (tx_shutdown, rx_shutdown) = tokio::sync::broadcast::channel::<()>(1); + active_sub = Some(ActiveSub { + coin, + account: id_account, + tx_shutdown + }); + MemPoolHandler::run(mempool_handler, rx_shutdown)?; } } MemPoolMsg::Balance(coin, id_account, balance) => { - if coin == active_coin && id_account == active_account { - f(balance); + if let Some(ActiveSub { coin: active_coin, account: active_account, .. }) = active_sub.as_ref() { + if coin == *active_coin && id_account == *active_account { + f(balance); + } } } MemPoolMsg::Close(coin, id_account) => { - if coin == active_coin && id_account == active_account { - subscribed = false; - let _ = tx_mesg - .send(MemPoolMsg::Subscribe(active_coin, active_account)) - .await; - f(0); + let active = active_sub.take(); + if let Some(ActiveSub { coin: active_coin, account: active_account , .. }) = active { + if coin == active_coin && id_account == active_account { + f(0); + let _ = tx_mesg + .send(MemPoolMsg::Subscribe(active_coin, active_account)) + .await; + } } } }