diff --git a/bin/autobahn-router/src/edge_updater.rs b/bin/autobahn-router/src/edge_updater.rs index 42e4898..199268d 100644 --- a/bin/autobahn-router/src/edge_updater.rs +++ b/bin/autobahn-router/src/edge_updater.rs @@ -79,7 +79,7 @@ pub fn spawn_updater_job( register_mint_sender: async_channel::Sender, ready_sender: async_channel::Sender<()>, mut slot_updates: broadcast::Receiver, - mut account_updates: broadcast::Receiver<(Pubkey, u64)>, + mut account_updates: broadcast::Receiver<(Pubkey, Pubkey, u64)>, mut metadata_updates: broadcast::Receiver, mut price_updates: broadcast::Receiver, mut exit: broadcast::Receiver<()>, @@ -311,9 +311,8 @@ impl EdgeUpdater { } } - fn invalidate_one(&mut self, res: Result<(Pubkey, u64), RecvError>) -> bool { - let state = &mut self.state; - let (pk, slot) = match res { + fn invalidate_one(&mut self, res: Result<(Pubkey, Pubkey, u64), RecvError>) -> bool { + let (pk, owner, slot) = match res { Ok(v) => v, Err(broadcast::error::RecvError::Closed) => { error!("account update channel closed unexpectedly"); @@ -328,6 +327,11 @@ impl EdgeUpdater { } }; + // check if we need the update + if !self.do_update(&pk, &owner) { + return true; + } + let state = &mut self.state; if let Some(impacted_edges) = self.dex.edges_per_pk.get(&pk) { for edge in impacted_edges { state.dirty_edges.insert(edge.unique_id(), edge.clone()); @@ -372,6 +376,27 @@ impl EdgeUpdater { } } + // ignore update if current dex does not need it + fn do_update(&mut self, pk: &Pubkey, owner: &Pubkey) -> bool { + if self.dex.edges_per_pk.contains_key(pk) { + return true; + } + match &self.dex.subscription_mode { + DexSubscriptionMode::Accounts(accounts) => { + return accounts.contains(pk) + }, + DexSubscriptionMode::Disabled => { + false + }, + DexSubscriptionMode::Programs(programs) => { + programs.contains(pk) || programs.contains(owner) + }, + DexSubscriptionMode::Mixed(m) => { + m.accounts.contains(pk) || m.token_accounts_for_owner.contains(pk) || m.programs.contains(pk) || m.programs.contains(owner) + } + } + } + fn refresh_some(&mut self) { let state = &mut self.state; if state.dirty_edges.is_empty() || !state.is_ready { diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index d957270..fa84d5f 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -576,7 +576,7 @@ fn start_chaindata_updating( chain_data: ChainDataArcRw, account_writes: async_channel::Receiver, slot_updates: async_channel::Receiver, - account_update_sender: broadcast::Sender<(Pubkey, u64)>, + account_update_sender: broadcast::Sender<(Pubkey, Pubkey, u64)>, mut exit: broadcast::Receiver<()>, ) -> JoinHandle<()> { use mango_feeds_connector::chain_data::SlotData; @@ -643,7 +643,7 @@ fn handle_updated_account( most_recent_seen_slot: &mut u64, chain_data: &mut RwLockWriteGuard, update: AccountOrSnapshotUpdate, - account_update_sender: &broadcast::Sender<(Pubkey, u64)>, + account_update_sender: &broadcast::Sender<(Pubkey, Pubkey, u64)>, ) { use mango_feeds_connector::chain_data::AccountData; use solana_sdk::account::WritableAccount; @@ -652,7 +652,7 @@ fn handle_updated_account( fn one_update( most_recent_seen_slot: &mut u64, chain_data: &mut RwLockWriteGuard, - account_update_sender: &broadcast::Sender<(Pubkey, u64)>, + account_update_sender: &broadcast::Sender<(Pubkey, Pubkey, u64)>, account_write: AccountWrite, ) { chain_data.update_account( @@ -680,7 +680,7 @@ fn handle_updated_account( } // ignore failing sends when there are no receivers - let _err = account_update_sender.send((account_write.pubkey, account_write.slot)); + let _err = account_update_sender.send((account_write.pubkey, account_write.owner, account_write.slot)); } match update { diff --git a/bin/autobahn-router/src/routing.rs b/bin/autobahn-router/src/routing.rs index 489257b..48528d8 100644 --- a/bin/autobahn-router/src/routing.rs +++ b/bin/autobahn-router/src/routing.rs @@ -843,6 +843,14 @@ impl Routing { warn!(valid_edge_count, skipped_bad_price_impact, "pruning"); } + // for mint_vec in out_edges_per_mint_index.iter() { + // for mint in mint_vec { + // let input_mint = mint_to_index.iter().filter(|(_, x)| **x==mint.source_node).map(|(pk,_)| *pk).collect_vec(); + // let output_mint = mint_to_index.iter().filter(|(_, x)| **x==mint.target_node).map(|(pk,_)| *pk).collect_vec(); + // info!("input_mint {:?} {:?}", input_mint, output_mint ); + // } + // } + (valid_edge_count, out_edges_per_mint_index) } diff --git a/fly.toml b/fly.toml index 3ef1e97..0927c77 100644 --- a/fly.toml +++ b/fly.toml @@ -1,5 +1,5 @@ -app = "router-1" -primary_region = "ams" +app = "router-2" +primary_region = "dfw" kill_signal = "SIGTERM" kill_timeout = "30s" @@ -10,8 +10,8 @@ kill_timeout = "30s" cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"] [[vm]] - size = "performance-4x" - memory = "16gb" + size = "performance-16x" + memory = "32gb" [[restart]] policy = "always"