Separate out shared code into a "data-streams" crate
This commit is contained in:
parent
8ee6c6ac10
commit
d3fec673cf
|
@ -5843,6 +5843,47 @@ dependencies = [
|
|||
"syn 1.0.107",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-geyser-connector-data-streams"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrayref",
|
||||
"async-channel",
|
||||
"async-stream 0.2.1",
|
||||
"async-trait",
|
||||
"base64 0.9.3",
|
||||
"bs58 0.3.1",
|
||||
"bytemuck",
|
||||
"bytes 1.4.0",
|
||||
"chrono",
|
||||
"fixed",
|
||||
"futures 0.3.26",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"itertools 0.10.5",
|
||||
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"jsonrpc-core-client",
|
||||
"log 0.4.17",
|
||||
"native-tls",
|
||||
"prost 0.9.0",
|
||||
"rand 0.7.3",
|
||||
"rustls 0.20.8",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
"solana-account-decoder",
|
||||
"solana-client",
|
||||
"solana-rpc",
|
||||
"solana-sdk",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.6.2",
|
||||
"tonic-build 0.6.2",
|
||||
"warp",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "solana-geyser-connector-lib"
|
||||
version = "0.1.0"
|
||||
|
@ -5881,6 +5922,7 @@ dependencies = [
|
|||
"serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)",
|
||||
"solana-account-decoder",
|
||||
"solana-client",
|
||||
"solana-geyser-connector-data-streams",
|
||||
"solana-rpc",
|
||||
"solana-sdk",
|
||||
"tokio",
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
[workspace]
|
||||
members = [
|
||||
"data-streams",
|
||||
"lib",
|
||||
"service-mango-crank",
|
||||
"service-mango-fills",
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
[package]
|
||||
name = "solana-geyser-connector-data-streams"
|
||||
version = "0.1.0"
|
||||
authors = ["Christian Kamm <mail@ckamm.de>"]
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
|
||||
|
||||
[dependencies]
|
||||
jsonrpc-core = "18.0.0"
|
||||
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
|
||||
|
||||
solana-rpc = "~1.14.9"
|
||||
solana-client = "~1.14.9"
|
||||
solana-account-decoder = "~1.14.9"
|
||||
solana-sdk = "~1.14.9"
|
||||
|
||||
arrayref = "*"
|
||||
bytemuck = "*"
|
||||
fixed = { version = "*", features = ["serde"] }
|
||||
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-stream = "0.1"
|
||||
native-tls = "0.2"
|
||||
rustls = "0.20.8"
|
||||
rustls-pemfile = "1.0.2"
|
||||
|
||||
serde = "1.0.130"
|
||||
serde_derive = "1.0.130"
|
||||
serde_json = "1.0.68"
|
||||
|
||||
tonic = { version = "0.6", features = ["tls", "compression"] }
|
||||
prost = "0.9"
|
||||
|
||||
bs58 = "*"
|
||||
base64 = "*"
|
||||
log = "0.4"
|
||||
rand = "0.7"
|
||||
anyhow = "1.0"
|
||||
bytes = "1.0"
|
||||
itertools = "0.10.5"
|
||||
chrono = "0.4.23"
|
||||
|
||||
futures = "0.3.17"
|
||||
futures-core = "0.3"
|
||||
futures-util = "0.3"
|
||||
|
||||
async-stream = "0.2"
|
||||
async-channel = "1.6"
|
||||
async-trait = "0.1"
|
||||
|
||||
warp = "0.3"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = { version = "0.6", features = ["compression"] }
|
||||
|
|
@ -4,9 +4,8 @@ use crate::{
|
|||
AccountWrite, SlotUpdate,
|
||||
};
|
||||
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use async_trait::async_trait;
|
||||
use solana_sdk::{account::WritableAccount, stake_history::Epoch};
|
||||
use solana_sdk::{account::WritableAccount, stake_history::Epoch, pubkey::Pubkey};
|
||||
use std::{
|
||||
collections::{BTreeSet, HashMap},
|
||||
sync::Arc,
|
|
@ -37,7 +37,7 @@ pub use solana::storage::confirmed_block::*;
|
|||
use crate::FilterConfig;
|
||||
use crate::{
|
||||
metrics::{MetricType, Metrics},
|
||||
AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, SnapshotSourceConfig,
|
||||
AccountWrite, AnyhowWrap, GrpcSourceConfig, chain_data::SlotStatus, SlotUpdate, SnapshotSourceConfig,
|
||||
SourceConfig, TlsConfig,
|
||||
};
|
||||
|
|
@ -0,0 +1,102 @@
|
|||
pub mod account_write_filter;
|
||||
pub mod chain_data;
|
||||
pub mod grpc_plugin_source;
|
||||
pub mod websocket_source;
|
||||
pub mod metrics;
|
||||
|
||||
use {
|
||||
serde_derive::Deserialize,
|
||||
solana_sdk::{account::Account, pubkey::Pubkey},
|
||||
};
|
||||
|
||||
trait AnyhowWrap {
|
||||
type Value;
|
||||
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
|
||||
}
|
||||
|
||||
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
|
||||
type Value = T;
|
||||
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
|
||||
self.map_err(|err| anyhow::anyhow!("{:?}", err))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
pub struct AccountWrite {
|
||||
pub pubkey: Pubkey,
|
||||
pub slot: u64,
|
||||
pub write_version: u64,
|
||||
pub lamports: u64,
|
||||
pub owner: Pubkey,
|
||||
pub executable: bool,
|
||||
pub rent_epoch: u64,
|
||||
pub data: Vec<u8>,
|
||||
pub is_selected: bool,
|
||||
}
|
||||
|
||||
impl AccountWrite {
|
||||
fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
|
||||
AccountWrite {
|
||||
pubkey,
|
||||
slot: slot,
|
||||
write_version,
|
||||
lamports: account.lamports,
|
||||
owner: account.owner,
|
||||
executable: account.executable,
|
||||
rent_epoch: account.rent_epoch,
|
||||
data: account.data,
|
||||
is_selected: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SlotUpdate {
|
||||
pub slot: u64,
|
||||
pub parent: Option<u64>,
|
||||
pub status: chain_data::SlotStatus,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct TlsConfig {
|
||||
pub ca_cert_path: String,
|
||||
pub client_cert_path: String,
|
||||
pub client_key_path: String,
|
||||
pub domain_name: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct GrpcSourceConfig {
|
||||
pub name: String,
|
||||
pub connection_string: String,
|
||||
pub retry_connection_sleep_secs: u64,
|
||||
pub tls: Option<TlsConfig>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct SourceConfig {
|
||||
pub dedup_queue_size: usize,
|
||||
pub grpc_sources: Vec<GrpcSourceConfig>,
|
||||
pub snapshot: SnapshotSourceConfig,
|
||||
pub rpc_ws_url: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct SnapshotSourceConfig {
|
||||
pub rpc_http_url: String,
|
||||
pub program_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct FilterConfig {
|
||||
pub program_ids: Vec<String>,
|
||||
pub account_ids: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct MetricsConfig {
|
||||
pub output_stdout: bool,
|
||||
pub output_http: bool,
|
||||
// TODO: add configurable port and endpoint url
|
||||
// TODO: add configurable write interval
|
||||
}
|
|
@ -16,7 +16,7 @@ use std::{
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use crate::{AccountWrite, AnyhowWrap, SlotStatus, SlotUpdate, SourceConfig};
|
||||
use crate::{AccountWrite, AnyhowWrap, chain_data::SlotStatus, SlotUpdate, SourceConfig};
|
||||
|
||||
enum WebsocketMessage {
|
||||
SingleUpdate(Response<RpcKeyedAccount>),
|
|
@ -8,6 +8,8 @@ edition = "2021"
|
|||
|
||||
|
||||
[dependencies]
|
||||
solana-geyser-connector-data-streams = { file = "0.1.0", path = "../data-streams" }
|
||||
|
||||
jsonrpc-core = "18.0.0"
|
||||
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
|
||||
|
||||
|
@ -16,7 +18,7 @@ solana-client = "~1.14.9"
|
|||
solana-account-decoder = "~1.14.9"
|
||||
solana-sdk = "~1.14.9"
|
||||
|
||||
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev", features = ["client"] }
|
||||
arrayref = "*"
|
||||
bytemuck = "*"
|
||||
fixed = { version = "*", features = ["serde"] }
|
||||
|
@ -62,7 +64,7 @@ warp = "0.3"
|
|||
|
||||
anchor-lang = "0.25.0"
|
||||
|
||||
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
|
||||
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = { version = "0.6", features = ["compression"] }
|
||||
|
|
105
lib/src/lib.rs
105
lib/src/lib.rs
|
@ -1,71 +1,14 @@
|
|||
|
||||
pub mod account_write_filter;
|
||||
pub mod chain_data;
|
||||
pub mod fill_event_filter;
|
||||
pub mod fill_event_postgres_target;
|
||||
pub mod grpc_plugin_source;
|
||||
pub mod memory_target;
|
||||
pub mod metrics;
|
||||
pub mod orderbook_filter;
|
||||
pub mod postgres_types_numeric;
|
||||
pub mod serum;
|
||||
pub mod websocket_source;
|
||||
|
||||
pub use chain_data::SlotStatus;
|
||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||
use serde_derive::Deserialize;
|
||||
|
||||
use {
|
||||
serde_derive::Deserialize,
|
||||
solana_sdk::{account::Account, pubkey::Pubkey},
|
||||
};
|
||||
|
||||
trait AnyhowWrap {
|
||||
type Value;
|
||||
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
|
||||
}
|
||||
|
||||
impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
|
||||
type Value = T;
|
||||
fn map_err_anyhow(self) -> anyhow::Result<Self::Value> {
|
||||
self.map_err(|err| anyhow::anyhow!("{:?}", err))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Debug)]
|
||||
pub struct AccountWrite {
|
||||
pub pubkey: Pubkey,
|
||||
pub slot: u64,
|
||||
pub write_version: u64,
|
||||
pub lamports: u64,
|
||||
pub owner: Pubkey,
|
||||
pub executable: bool,
|
||||
pub rent_epoch: u64,
|
||||
pub data: Vec<u8>,
|
||||
pub is_selected: bool,
|
||||
}
|
||||
|
||||
impl AccountWrite {
|
||||
fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite {
|
||||
AccountWrite {
|
||||
pubkey,
|
||||
slot: slot,
|
||||
write_version,
|
||||
lamports: account.lamports,
|
||||
owner: account.owner,
|
||||
executable: account.executable,
|
||||
rent_epoch: account.rent_epoch,
|
||||
data: account.data,
|
||||
is_selected: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SlotUpdate {
|
||||
pub slot: u64,
|
||||
pub parent: Option<u64>,
|
||||
pub status: chain_data::SlotStatus,
|
||||
}
|
||||
pub use solana_geyser_connector_data_streams::*;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct PostgresConfig {
|
||||
|
@ -97,36 +40,6 @@ pub struct PostgresTlsConfig {
|
|||
pub client_key_path: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct TlsConfig {
|
||||
pub ca_cert_path: String,
|
||||
pub client_cert_path: String,
|
||||
pub client_key_path: String,
|
||||
pub domain_name: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct GrpcSourceConfig {
|
||||
pub name: String,
|
||||
pub connection_string: String,
|
||||
pub retry_connection_sleep_secs: u64,
|
||||
pub tls: Option<TlsConfig>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct SourceConfig {
|
||||
pub dedup_queue_size: usize,
|
||||
pub grpc_sources: Vec<GrpcSourceConfig>,
|
||||
pub snapshot: SnapshotSourceConfig,
|
||||
pub rpc_ws_url: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct FilterConfig {
|
||||
pub program_ids: Vec<String>,
|
||||
pub account_ids: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StatusResponse<'a> {
|
||||
pub success: bool,
|
||||
|
@ -146,20 +59,6 @@ impl<'a> Serialize for StatusResponse<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct SnapshotSourceConfig {
|
||||
pub rpc_http_url: String,
|
||||
pub program_id: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct MetricsConfig {
|
||||
pub output_stdout: bool,
|
||||
pub output_http: bool,
|
||||
// TODO: add configurable port and endpoint url
|
||||
// TODO: add configurable write interval
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct Config {
|
||||
pub postgres_target: PostgresConfig,
|
||||
|
|
Loading…
Reference in New Issue