Use new shared mango-feeds-connector crate for chain_data (#515)

This commit is contained in:
Christian Kamm 2023-03-29 09:46:06 +02:00 committed by GitHub
parent 03250d44da
commit 30984bbef1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 159 additions and 263 deletions

141
Cargo.lock generated
View File

@ -625,6 +625,12 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "base64ct"
version = "1.5.3"
@ -821,6 +827,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "buf_redux"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b953a6887648bb07a535631f2bc00fbdb2a2216f135552cb3f534ed136b9c07f"
dependencies = [
"memchr",
"safemem",
]
[[package]]
name = "bumpalo"
version = "3.11.1"
@ -2962,6 +2978,31 @@ dependencies = [
"libc",
]
[[package]]
name = "mango-feeds-connector"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f19edbc11d424f24901120ec7eb09ac5c49b1da805fb93c47950febbf088433"
dependencies = [
"anyhow",
"async-channel",
"async-trait",
"futures 0.3.25",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"log 0.4.17",
"rustls",
"serde",
"serde_derive",
"solana-account-decoder",
"solana-client",
"solana-rpc",
"solana-sdk",
"tokio",
"warp",
"yellowstone-grpc-proto",
]
[[package]]
name = "mango-v4"
version = "0.10.0"
@ -3042,6 +3083,7 @@ dependencies = [
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"log 0.4.17",
"mango-feeds-connector",
"mango-v4",
"pyth-sdk-solana",
"reqwest",
@ -3401,6 +3443,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "multipart"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00dec633863867f29cb39df64a397cdf4a6354708ddd7759f70c7fb51c5f9182"
dependencies = [
"buf_redux",
"httparse",
"log 0.4.17",
"mime 0.3.16",
"mime_guess",
"quick-error",
"rand 0.8.5",
"safemem",
"tempfile",
"twoway",
]
[[package]]
name = "native-tls"
version = "0.2.11"
@ -4215,6 +4275,12 @@ dependencies = [
"percent-encoding 2.2.0",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quick-protobuf"
version = "0.8.0"
@ -4628,7 +4694,7 @@ dependencies = [
"percent-encoding 2.2.0",
"pin-project-lite",
"rustls",
"rustls-pemfile 1.0.1",
"rustls-pemfile 1.0.2",
"serde",
"serde_json",
"serde_urlencoded",
@ -4785,9 +4851,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.20.7"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log 0.4.17",
"ring",
@ -4802,7 +4868,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.1",
"rustls-pemfile 1.0.2",
"schannel",
"security-framework",
]
@ -4818,11 +4884,11 @@ dependencies = [
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.13.1",
"base64 0.21.0",
]
[[package]]
@ -4892,6 +4958,12 @@ dependencies = [
"syn 1.0.105",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.1.0"
@ -7241,7 +7313,7 @@ dependencies = [
"pin-project",
"prost",
"prost-derive",
"rustls-pemfile 1.0.1",
"rustls-pemfile 1.0.2",
"tokio",
"tokio-rustls",
"tokio-stream",
@ -7459,6 +7531,15 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "twoway"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b11b2b5241ba34be09c3cc85a36e56e48f9888862e19cedf23336d35316ed1"
dependencies = [
"memchr",
]
[[package]]
name = "typeable"
version = "0.1.2"
@ -7688,6 +7769,37 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d"
dependencies = [
"bytes 1.3.0",
"futures-channel",
"futures-util",
"headers",
"http",
"hyper 0.14.23",
"log 0.4.17",
"mime 0.3.16",
"mime_guess",
"multipart",
"percent-encoding 2.2.0",
"pin-project",
"rustls-pemfile 0.2.1",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-stream",
"tokio-tungstenite 0.17.2",
"tokio-util 0.7.2",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
@ -8076,6 +8188,19 @@ dependencies = [
"time 0.3.17",
]
[[package]]
name = "yellowstone-grpc-proto"
version = "1.0.1+solana.1.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f80b9fe53333bb46c7dec611089faef2553ad6ced1c6dde52d78d2eaaf1db5c"
dependencies = [
"anyhow",
"prost",
"protobuf-src",
"tonic",
"tonic-build",
]
[[package]]
name = "zeroize"
version = "1.3.0"

View File

@ -28,6 +28,7 @@ solana-client = "~1.14.9"
solana-rpc = "~1.14.9"
solana-sdk = "~1.14.9"
solana-address-lookup-table-program = "~1.14.9"
mango-feeds-connector = "0.1.0"
spl-associated-token-account = "1.0.3"
thiserror = "1.0.31"
log = "0.4"

View File

@ -44,9 +44,10 @@ impl Message {
trace!("websocket account message");
chain.update_account(
account_write.pubkey,
AccountAndSlot {
AccountData {
slot: account_write.slot,
account: account_write.account.clone(),
write_version: 1,
},
);
}
@ -54,9 +55,10 @@ impl Message {
for account_update in snapshot {
chain.update_account(
account_update.pubkey,
chain_data::AccountAndSlot {
chain_data::AccountData {
slot: account_update.slot,
account: account_update.account.clone(),
write_version: 0,
},
);
}

View File

@ -1,248 +1,2 @@
use {
solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
};
pub use crate::chain_data_fetcher::AccountFetcher;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
#[derive(Clone, Debug)]
pub struct SlotData {
pub slot: u64,
pub parent: Option<u64>,
pub status: SlotStatus,
pub chain: u64, // the top slot that this is in a chain with. uncles will have values < tip
}
#[derive(Clone, Debug)]
pub struct AccountAndSlot {
pub slot: u64,
pub account: AccountSharedData,
}
/// Track slots and account writes
///
/// - use account() to retrieve the current best data for an account.
/// - update_from_snapshot() and update_from_websocket() update the state for new messages
pub struct ChainData {
/// only slots >= newest_rooted_slot are retained
slots: HashMap<u64, SlotData>,
/// writes to accounts, only the latest rooted write an newer are retained
accounts: HashMap<Pubkey, Vec<AccountAndSlot>>,
newest_rooted_slot: u64,
newest_processed_slot: u64,
best_chain_slot: u64,
}
impl ChainData {
pub fn new() -> Self {
Self {
slots: HashMap::new(),
accounts: HashMap::new(),
newest_rooted_slot: 0,
newest_processed_slot: 0,
best_chain_slot: 0,
}
}
pub fn update_slot(&mut self, new_slot: SlotData) {
let new_processed_head = new_slot.slot > self.newest_processed_slot;
if new_processed_head {
self.newest_processed_slot = new_slot.slot;
}
let new_rooted_head =
new_slot.slot > self.newest_rooted_slot && new_slot.status == SlotStatus::Rooted;
if new_rooted_head {
self.newest_rooted_slot = new_slot.slot;
}
// Use the highest slot that has a known parent as best chain
// (sometimes slots OptimisticallyConfirm before we even know the parent!)
let new_best_chain = new_slot.parent.is_some() && new_slot.slot > self.best_chain_slot;
if new_best_chain {
self.best_chain_slot = new_slot.slot;
}
let mut parent_update = false;
use std::collections::hash_map::Entry;
match self.slots.entry(new_slot.slot) {
Entry::Vacant(v) => {
v.insert(new_slot);
}
Entry::Occupied(o) => {
let v = o.into_mut();
parent_update = v.parent != new_slot.parent && new_slot.parent.is_some();
v.parent = v.parent.or(new_slot.parent);
if v.status == SlotStatus::Processed || new_slot.status == SlotStatus::Rooted {
v.status = new_slot.status;
}
}
};
if new_best_chain || parent_update {
// update the "chain" field down to the first rooted slot
let mut slot = self.best_chain_slot;
loop {
if let Some(data) = self.slots.get_mut(&slot) {
data.chain = self.best_chain_slot;
if data.status == SlotStatus::Rooted {
break;
}
if let Some(parent) = data.parent {
slot = parent;
continue;
}
}
break;
}
}
if new_rooted_head {
// for each account, preserve only writes > newest_rooted_slot, or the newest
// rooted write
for (_, writes) in self.accounts.iter_mut() {
let newest_rooted_write = writes
.iter()
.rev()
.find(|w| {
w.slot <= self.newest_rooted_slot
&& self
.slots
.get(&w.slot)
.map(|s| {
// sometimes we seem not to get notifications about slots
// getting rooted, hence assume non-uncle slots < newest_rooted_slot
// are rooted too
s.status == SlotStatus::Rooted
|| s.chain == self.best_chain_slot
})
// preserved account writes for deleted slots <= newest_rooted_slot
// are expected to be rooted
.unwrap_or(true)
})
.map(|w| w.slot)
// no rooted write found: produce no effect, since writes > newest_rooted_slot are retained anyway
.unwrap_or(self.newest_rooted_slot + 1);
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
}
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
}
}
pub fn update_account(&mut self, pubkey: Pubkey, account: AccountAndSlot) {
use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
v.insert(vec![account]);
}
Entry::Occupied(o) => {
let v = o.into_mut();
// v is ordered by slot ascending. find the right position
// overwrite if an entry for the slot already exists, otherwise insert
let rev_pos = v
.iter()
.rev()
.position(|d| d.slot <= account.slot)
.unwrap_or(v.len());
let pos = v.len() - rev_pos;
if pos < v.len() && v[pos].slot == account.slot {
v[pos] = account;
} else {
v.insert(pos, account);
}
}
};
}
pub fn update_from_rpc(&mut self, pubkey: &Pubkey, account: AccountAndSlot) {
// Add a stub slot if the rpc has information about the future.
// If it's in the past, either the slot already exists (and maybe we have
// the data already), or it was a skipped slot and adding it now makes no difference.
if account.slot > self.best_chain_slot {
self.update_slot(SlotData {
slot: account.slot,
parent: Some(self.best_chain_slot),
status: SlotStatus::Processed,
chain: 0,
});
}
self.update_account(*pubkey, account)
}
fn is_account_write_live(&self, write: &AccountAndSlot) -> bool {
self.slots
.get(&write.slot)
// either the slot is rooted, in the current chain or newer than the chain head
.map(|s| {
s.status == SlotStatus::Rooted
|| s.chain == self.best_chain_slot
|| write.slot > self.best_chain_slot
})
// if the slot can't be found but preceeds newest rooted, use it too (old rooted slots are removed)
.unwrap_or(write.slot <= self.newest_rooted_slot)
}
/// Cloned snapshot of all the most recent live writes per pubkey
pub fn accounts_snapshot(&self) -> HashMap<Pubkey, AccountAndSlot> {
self.accounts
.iter()
.filter_map(|(pubkey, writes)| {
let latest_good_write = writes
.iter()
.rev()
.find(|w| self.is_account_write_live(w))?;
Some((*pubkey, latest_good_write.clone()))
})
.collect()
}
/// Ref to the most recent live write of the pubkey
pub fn account<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountSharedData> {
self.account_and_slot(pubkey).map(|w| &w.account)
}
/// Ref to the most recent live write of the pubkey, along with the slot that it was for
pub fn account_and_slot<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountAndSlot> {
self.accounts
.get(pubkey)
.ok_or_else(|| anyhow::anyhow!("account {} not found", pubkey))?
.iter()
.rev()
.find(|w| self.is_account_write_live(w))
.ok_or_else(|| anyhow::anyhow!("account {} has no live data", pubkey))
}
pub fn iter_accounts<'a>(&'a self) -> impl Iterator<Item = (&'a Pubkey, &'a AccountAndSlot)> {
self.accounts.iter().filter_map(|(pk, writes)| {
writes
.iter()
.rev()
.find(|w| self.is_account_write_live(w))
.map(|latest_write| (pk, latest_write))
})
}
pub fn slots_count(&self) -> usize {
self.slots.len()
}
pub fn accounts_count(&self) -> usize {
self.accounts.len()
}
pub fn account_writes_count(&self) -> usize {
self.accounts.values().map(|v| v.len()).sum()
}
}
pub use mango_feeds_connector::chain_data::*;

View File

@ -75,8 +75,8 @@ impl AccountFetcher {
let chain_data = self.chain_data.read().unwrap();
Ok(chain_data
.account(address)
.with_context(|| format!("fetch account {} via chain_data", address))?
.clone())
.map(|d| d.account.clone())
.with_context(|| format!("fetch account {} via chain_data", address))?)
}
pub async fn refresh_account_via_rpc(&self, address: &Pubkey) -> anyhow::Result<Slot> {
@ -92,11 +92,25 @@ impl AccountFetcher {
.with_context(|| format!("refresh account {} via rpc", address))?;
let mut chain_data = self.chain_data.write().unwrap();
chain_data.update_from_rpc(
address,
AccountAndSlot {
slot: response.context.slot,
let best_chain_slot = chain_data.best_chain_slot();
// The RPC can get information for slots that haven't been seen yet on chaindata. That means
// that the rpc thinks that slot is valid. Make it so by telling chain data about it.
if best_chain_slot < slot {
chain_data.update_slot(SlotData {
slot,
parent: Some(best_chain_slot),
status: SlotStatus::Processed,
chain: 0,
});
}
chain_data.update_account(
*address,
AccountData {
slot,
account: account.into(),
write_version: 1,
},
);