2021-11-01 13:48:17 -07:00
|
|
|
mod grpc_plugin_source;
|
2021-11-02 00:55:39 -07:00
|
|
|
mod postgres_target;
|
2021-11-01 13:48:17 -07:00
|
|
|
mod websocket_source;
|
2021-11-01 02:34:25 -07:00
|
|
|
|
2021-11-02 05:22:13 -07:00
|
|
|
use {
|
2021-11-03 09:22:52 -07:00
|
|
|
async_trait::async_trait,
|
2021-11-03 04:17:49 -07:00
|
|
|
log::*,
|
2021-11-02 05:22:13 -07:00
|
|
|
serde_derive::Deserialize,
|
2021-11-02 06:35:45 -07:00
|
|
|
solana_sdk::{account::Account, pubkey::Pubkey},
|
2021-11-03 09:22:52 -07:00
|
|
|
std::{fs::File, io::Read, sync::Arc},
|
2021-11-02 05:22:13 -07:00
|
|
|
};
|
2021-11-01 02:34:25 -07:00
|
|
|
|
|
|
|
trait AnyhowWrap {
|
|
|
|
type Value;
|
|
|
|
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
|
|
|
|
type Value = T;
|
|
|
|
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
|
|
|
|
self.map_err(|err| anyhow::anyhow!("{:?}", err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, PartialEq, Debug)]
|
|
|
|
pub struct AccountWrite {
|
|
|
|
pub pubkey: Pubkey,
|
|
|
|
pub slot: i64,
|
|
|
|
pub write_version: i64,
|
|
|
|
pub lamports: i64,
|
|
|
|
pub owner: Pubkey,
|
|
|
|
pub executable: bool,
|
|
|
|
pub rent_epoch: i64,
|
|
|
|
pub data: Vec<u8>,
|
|
|
|
}
|
|
|
|
|
2021-11-03 09:22:52 -07:00
|
|
|
impl AccountWrite {
|
2021-11-02 06:35:45 -07:00
|
|
|
fn from(pubkey: Pubkey, slot: u64, write_version: i64, account: Account) -> AccountWrite {
|
|
|
|
AccountWrite {
|
|
|
|
pubkey,
|
|
|
|
slot: slot as i64, // TODO: narrowing!
|
|
|
|
write_version,
|
|
|
|
lamports: account.lamports as i64, // TODO: narrowing!
|
|
|
|
owner: account.owner,
|
|
|
|
executable: account.executable,
|
|
|
|
rent_epoch: account.rent_epoch as i64, // TODO: narrowing!
|
|
|
|
data: account.data.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-01 02:34:25 -07:00
|
|
|
#[derive(Clone, PartialEq, Debug)]
|
2021-11-01 13:48:17 -07:00
|
|
|
pub struct SlotUpdate {
|
|
|
|
pub slot: i64,
|
|
|
|
pub parent: Option<i64>,
|
|
|
|
pub status: String,
|
2021-11-01 02:34:25 -07:00
|
|
|
}
|
|
|
|
|
2021-11-02 05:22:13 -07:00
|
|
|
#[derive(Clone, Debug, Deserialize)]
|
|
|
|
pub struct Config {
|
|
|
|
postgres_connection_string: String,
|
|
|
|
grpc_connection_string: String,
|
|
|
|
rpc_http_url: String,
|
|
|
|
rpc_ws_url: String,
|
|
|
|
}
|
|
|
|
|
2021-11-03 09:22:52 -07:00
|
|
|
#[async_trait]
|
|
|
|
pub trait AccountTable: Sync + Send {
|
|
|
|
fn table_name(&self) -> &str;
|
|
|
|
async fn insert_account_write(
|
|
|
|
&self,
|
|
|
|
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
|
|
account_write: &AccountWrite,
|
|
|
|
) -> Result<(), anyhow::Error>;
|
|
|
|
}
|
|
|
|
|
|
|
|
pub type AccountTables = Vec<Arc<dyn AccountTable>>;
|
|
|
|
|
|
|
|
struct RawAccountTable {}
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
impl AccountTable for RawAccountTable {
|
|
|
|
fn table_name(&self) -> &str {
|
|
|
|
"account_write"
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn insert_account_write(
|
|
|
|
&self,
|
|
|
|
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
|
|
account_write: &AccountWrite,
|
|
|
|
) -> Result<(), anyhow::Error> {
|
|
|
|
let pubkey: &[u8] = &account_write.pubkey.to_bytes();
|
|
|
|
let owner: &[u8] = &account_write.owner.to_bytes();
|
|
|
|
|
|
|
|
// TODO: should update for same write_version to work with websocket input
|
|
|
|
let query = postgres_query::query!(
|
|
|
|
"
|
|
|
|
INSERT INTO account_write
|
|
|
|
(pubkey, slot, write_version, owner, lamports, executable, rent_epoch, data)
|
|
|
|
VALUES
|
|
|
|
($pubkey, $slot, $write_version, $owner, $lamports, $executable, $rent_epoch, $data)
|
|
|
|
ON CONFLICT (pubkey, slot, write_version) DO NOTHING",
|
|
|
|
pubkey,
|
|
|
|
slot = account_write.slot,
|
|
|
|
write_version = account_write.write_version,
|
|
|
|
owner,
|
|
|
|
lamports = account_write.lamports,
|
|
|
|
executable = account_write.executable,
|
|
|
|
rent_epoch = account_write.rent_epoch,
|
|
|
|
data = account_write.data,
|
|
|
|
);
|
|
|
|
let _ = query.execute(client).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct MangoAccountTable {}
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
impl AccountTable for MangoAccountTable {
|
|
|
|
fn table_name(&self) -> &str {
|
|
|
|
"mango_account_write"
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn insert_account_write(
|
|
|
|
&self,
|
|
|
|
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
|
|
account_write: &AccountWrite,
|
|
|
|
) -> Result<(), anyhow::Error> {
|
|
|
|
info!("custom fn");
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-01 02:34:25 -07:00
|
|
|
#[tokio::main]
|
2021-11-02 05:22:13 -07:00
|
|
|
async fn main() -> Result<(), anyhow::Error> {
|
|
|
|
let args: Vec<String> = std::env::args().collect();
|
|
|
|
if args.len() < 2 {
|
|
|
|
println!("requires a config file argument");
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
let config: Config = {
|
|
|
|
let mut file = File::open(&args[1])?;
|
|
|
|
let mut contents = String::new();
|
|
|
|
file.read_to_string(&mut contents)?;
|
|
|
|
serde_json::from_str(&contents).unwrap()
|
|
|
|
};
|
|
|
|
|
2021-11-02 04:54:39 -07:00
|
|
|
solana_logger::setup_with_default("info");
|
2021-11-02 00:55:39 -07:00
|
|
|
info!("startup");
|
2021-11-01 02:34:25 -07:00
|
|
|
|
2021-11-03 09:22:52 -07:00
|
|
|
//let custom_account_tables: AccountTables = vec![Arc::new(MangoAccountTable {})];
|
|
|
|
let account_tables: AccountTables = vec![Arc::new(RawAccountTable {})];
|
|
|
|
|
2021-11-01 02:34:25 -07:00
|
|
|
let (account_write_queue_sender, slot_queue_sender) =
|
2021-11-03 09:22:52 -07:00
|
|
|
postgres_target::init(&config.postgres_connection_string, account_tables).await?;
|
2021-11-01 02:34:25 -07:00
|
|
|
|
2021-11-03 04:17:49 -07:00
|
|
|
info!("postgres done");
|
2021-11-01 13:48:17 -07:00
|
|
|
let use_accountsdb = true;
|
|
|
|
if use_accountsdb {
|
2021-11-03 04:17:49 -07:00
|
|
|
grpc_plugin_source::process_events(config, account_write_queue_sender, slot_queue_sender)
|
|
|
|
.await;
|
2021-11-01 13:48:17 -07:00
|
|
|
} else {
|
2021-11-03 04:17:49 -07:00
|
|
|
websocket_source::process_events(config, account_write_queue_sender, slot_queue_sender)
|
|
|
|
.await;
|
2021-11-01 02:34:25 -07:00
|
|
|
}
|
2021-11-02 05:22:13 -07:00
|
|
|
|
|
|
|
Ok(())
|
2021-11-01 02:34:25 -07:00
|
|
|
}
|