diff --git a/Cargo.lock b/Cargo.lock index a1fca79..50c02fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5843,6 +5843,47 @@ dependencies = [ "syn 1.0.107", ] +[[package]] +name = "solana-geyser-connector-data-streams" +version = "0.1.0" +dependencies = [ + "anyhow", + "arrayref", + "async-channel", + "async-stream 0.2.1", + "async-trait", + "base64 0.9.3", + "bs58 0.3.1", + "bytemuck", + "bytes 1.4.0", + "chrono", + "fixed", + "futures 0.3.26", + "futures-core", + "futures-util", + "itertools 0.10.5", + "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "jsonrpc-core-client", + "log 0.4.17", + "native-tls", + "prost 0.9.0", + "rand 0.7.3", + "rustls 0.20.8", + "rustls-pemfile 1.0.2", + "serde", + "serde_derive", + "serde_json", + "solana-account-decoder", + "solana-client", + "solana-rpc", + "solana-sdk", + "tokio", + "tokio-stream", + "tonic 0.6.2", + "tonic-build 0.6.2", + "warp", +] + [[package]] name = "solana-geyser-connector-lib" version = "0.1.0" @@ -5881,6 +5922,7 @@ dependencies = [ "serum_dex 0.5.10 (git+https://github.com/jup-ag/openbook-program?branch=feat/expose-things)", "solana-account-decoder", "solana-client", + "solana-geyser-connector-data-streams", "solana-rpc", "solana-sdk", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 9d1fc8a..2634096 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ + "data-streams", "lib", "service-mango-crank", "service-mango-fills", diff --git a/data-streams/Cargo.toml b/data-streams/Cargo.toml new file mode 100644 index 0000000..b8d9aa0 --- /dev/null +++ b/data-streams/Cargo.toml @@ -0,0 +1,57 @@ +[package] +name = "solana-geyser-connector-data-streams" +version = "0.1.0" +authors = ["Christian Kamm "] +edition = "2021" + +[lib] + + +[dependencies] +jsonrpc-core = "18.0.0" +jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } + +solana-rpc = "~1.14.9" +solana-client = "~1.14.9" +solana-account-decoder = "~1.14.9" +solana-sdk = "~1.14.9" + +arrayref = "*" +bytemuck = "*" +fixed = { version = "*", features = ["serde"] } + +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" +native-tls = "0.2" +rustls = "0.20.8" +rustls-pemfile = "1.0.2" + +serde = "1.0.130" +serde_derive = "1.0.130" +serde_json = "1.0.68" + +tonic = { version = "0.6", features = ["tls", "compression"] } +prost = "0.9" + +bs58 = "*" +base64 = "*" +log = "0.4" +rand = "0.7" +anyhow = "1.0" +bytes = "1.0" +itertools = "0.10.5" +chrono = "0.4.23" + +futures = "0.3.17" +futures-core = "0.3" +futures-util = "0.3" + +async-stream = "0.2" +async-channel = "1.6" +async-trait = "0.1" + +warp = "0.3" + +[build-dependencies] +tonic-build = { version = "0.6", features = ["compression"] } + diff --git a/lib/build.rs b/data-streams/build.rs similarity index 100% rename from lib/build.rs rename to data-streams/build.rs diff --git a/lib/src/account_write_filter.rs b/data-streams/src/account_write_filter.rs similarity index 98% rename from lib/src/account_write_filter.rs rename to data-streams/src/account_write_filter.rs index c58d7c8..3429357 100644 --- a/lib/src/account_write_filter.rs +++ b/data-streams/src/account_write_filter.rs @@ -4,9 +4,8 @@ use crate::{ AccountWrite, SlotUpdate, }; -use anchor_lang::prelude::Pubkey; use async_trait::async_trait; -use solana_sdk::{account::WritableAccount, stake_history::Epoch}; +use solana_sdk::{account::WritableAccount, stake_history::Epoch, pubkey::Pubkey}; use std::{ collections::{BTreeSet, HashMap}, sync::Arc, diff --git a/lib/src/chain_data.rs b/data-streams/src/chain_data.rs similarity index 100% rename from lib/src/chain_data.rs rename to data-streams/src/chain_data.rs diff --git a/lib/src/grpc_plugin_source.rs b/data-streams/src/grpc_plugin_source.rs similarity index 99% rename from lib/src/grpc_plugin_source.rs rename to data-streams/src/grpc_plugin_source.rs index 864981d..d2f9674 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/data-streams/src/grpc_plugin_source.rs @@ -37,7 +37,7 @@ pub use solana::storage::confirmed_block::*; use crate::FilterConfig; use crate::{ metrics::{MetricType, Metrics}, - AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, SnapshotSourceConfig, + AccountWrite, AnyhowWrap, GrpcSourceConfig, chain_data::SlotStatus, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig, }; diff --git a/data-streams/src/lib.rs b/data-streams/src/lib.rs new file mode 100644 index 0000000..1068f31 --- /dev/null +++ b/data-streams/src/lib.rs @@ -0,0 +1,102 @@ +pub mod account_write_filter; +pub mod chain_data; +pub mod grpc_plugin_source; +pub mod websocket_source; +pub mod metrics; + +use { + serde_derive::Deserialize, + solana_sdk::{account::Account, pubkey::Pubkey}, +}; + +trait AnyhowWrap { + type Value; + fn map_err_anyhow(self) -> anyhow::Result; +} + +impl AnyhowWrap for Result { + type Value = T; + fn map_err_anyhow(self) -> anyhow::Result { + self.map_err(|err| anyhow::anyhow!("{:?}", err)) + } +} + +#[derive(Clone, PartialEq, Debug)] +pub struct AccountWrite { + pub pubkey: Pubkey, + pub slot: u64, + pub write_version: u64, + pub lamports: u64, + pub owner: Pubkey, + pub executable: bool, + pub rent_epoch: u64, + pub data: Vec, + pub is_selected: bool, +} + +impl AccountWrite { + fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite { + AccountWrite { + pubkey, + slot: slot, + write_version, + lamports: account.lamports, + owner: account.owner, + executable: account.executable, + rent_epoch: account.rent_epoch, + data: account.data, + is_selected: true, + } + } +} + +#[derive(Clone, Debug)] +pub struct SlotUpdate { + pub slot: u64, + pub parent: Option, + pub status: chain_data::SlotStatus, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct TlsConfig { + pub ca_cert_path: String, + pub client_cert_path: String, + pub client_key_path: String, + pub domain_name: String, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct GrpcSourceConfig { + pub name: String, + pub connection_string: String, + pub retry_connection_sleep_secs: u64, + pub tls: Option, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct SourceConfig { + pub dedup_queue_size: usize, + pub grpc_sources: Vec, + pub snapshot: SnapshotSourceConfig, + pub rpc_ws_url: String, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct SnapshotSourceConfig { + pub rpc_http_url: String, + pub program_id: String, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct FilterConfig { + pub program_ids: Vec, + pub account_ids: Vec, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct MetricsConfig { + pub output_stdout: bool, + pub output_http: bool, + // TODO: add configurable port and endpoint url + // TODO: add configurable write interval +} \ No newline at end of file diff --git a/lib/src/metrics.rs b/data-streams/src/metrics.rs similarity index 100% rename from lib/src/metrics.rs rename to data-streams/src/metrics.rs diff --git a/lib/src/websocket_source.rs b/data-streams/src/websocket_source.rs similarity index 98% rename from lib/src/websocket_source.rs rename to data-streams/src/websocket_source.rs index 765f21f..5815158 100644 --- a/lib/src/websocket_source.rs +++ b/data-streams/src/websocket_source.rs @@ -16,7 +16,7 @@ use std::{ time::{Duration, Instant}, }; -use crate::{AccountWrite, AnyhowWrap, SlotStatus, SlotUpdate, SourceConfig}; +use crate::{AccountWrite, AnyhowWrap, chain_data::SlotStatus, SlotUpdate, SourceConfig}; enum WebsocketMessage { SingleUpdate(Response), diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 839e35e..5924fcc 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" [dependencies] +solana-geyser-connector-data-streams = { file = "0.1.0", path = "../data-streams" } + jsonrpc-core = "18.0.0" jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] } @@ -16,7 +18,7 @@ solana-client = "~1.14.9" solana-account-decoder = "~1.14.9" solana-sdk = "~1.14.9" -mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } +mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev", features = ["client"] } arrayref = "*" bytemuck = "*" fixed = { version = "*", features = ["serde"] } @@ -62,7 +64,7 @@ warp = "0.3" anchor-lang = "0.25.0" -serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" } +serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things", features = ["no-entrypoint"] } [build-dependencies] tonic-build = { version = "0.6", features = ["compression"] } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 872f115..d78c670 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,71 +1,14 @@ - -pub mod account_write_filter; -pub mod chain_data; pub mod fill_event_filter; pub mod fill_event_postgres_target; -pub mod grpc_plugin_source; pub mod memory_target; -pub mod metrics; pub mod orderbook_filter; pub mod postgres_types_numeric; pub mod serum; -pub mod websocket_source; -pub use chain_data::SlotStatus; use serde::{ser::SerializeStruct, Serialize, Serializer}; +use serde_derive::Deserialize; -use { - serde_derive::Deserialize, - solana_sdk::{account::Account, pubkey::Pubkey}, -}; - -trait AnyhowWrap { - type Value; - fn map_err_anyhow(self) -> anyhow::Result; -} - -impl AnyhowWrap for Result { - type Value = T; - fn map_err_anyhow(self) -> anyhow::Result { - self.map_err(|err| anyhow::anyhow!("{:?}", err)) - } -} - -#[derive(Clone, PartialEq, Debug)] -pub struct AccountWrite { - pub pubkey: Pubkey, - pub slot: u64, - pub write_version: u64, - pub lamports: u64, - pub owner: Pubkey, - pub executable: bool, - pub rent_epoch: u64, - pub data: Vec, - pub is_selected: bool, -} - -impl AccountWrite { - fn from(pubkey: Pubkey, slot: u64, write_version: u64, account: Account) -> AccountWrite { - AccountWrite { - pubkey, - slot: slot, - write_version, - lamports: account.lamports, - owner: account.owner, - executable: account.executable, - rent_epoch: account.rent_epoch, - data: account.data, - is_selected: true, - } - } -} - -#[derive(Clone, Debug)] -pub struct SlotUpdate { - pub slot: u64, - pub parent: Option, - pub status: chain_data::SlotStatus, -} +pub use solana_geyser_connector_data_streams::*; #[derive(Clone, Debug, Deserialize)] pub struct PostgresConfig { @@ -97,36 +40,6 @@ pub struct PostgresTlsConfig { pub client_key_path: String, } -#[derive(Clone, Debug, Deserialize)] -pub struct TlsConfig { - pub ca_cert_path: String, - pub client_cert_path: String, - pub client_key_path: String, - pub domain_name: String, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct GrpcSourceConfig { - pub name: String, - pub connection_string: String, - pub retry_connection_sleep_secs: u64, - pub tls: Option, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct SourceConfig { - pub dedup_queue_size: usize, - pub grpc_sources: Vec, - pub snapshot: SnapshotSourceConfig, - pub rpc_ws_url: String, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct FilterConfig { - pub program_ids: Vec, - pub account_ids: Vec, -} - #[derive(Clone, Debug)] pub struct StatusResponse<'a> { pub success: bool, @@ -146,20 +59,6 @@ impl<'a> Serialize for StatusResponse<'a> { } } -#[derive(Clone, Debug, Deserialize)] -pub struct SnapshotSourceConfig { - pub rpc_http_url: String, - pub program_id: String, -} - -#[derive(Clone, Debug, Deserialize)] -pub struct MetricsConfig { - pub output_stdout: bool, - pub output_http: bool, - // TODO: add configurable port and endpoint url - // TODO: add configurable write interval -} - #[derive(Clone, Debug, Deserialize)] pub struct Config { pub postgres_target: PostgresConfig,