Compare commits

...

4 Commits

Author SHA1 Message Date
Riordan Panayides cccc7accd5 Merge remote-tracking branch 'upstream/tokio-postgres' into pan/fly-deploy 2023-05-15 13:05:06 +01:00
dboures c20429bcc5
wip: attempt to use SSL cert 2023-05-14 03:21:34 -05:00
dboures a4bf9a35be
refactor: server uses tokio-postgres instead of sqlx 2023-05-14 03:09:12 -05:00
dboures cead78381d
refactor: worker uses tokio-postgres instead of sqlx 2023-05-14 02:15:10 -05:00
20 changed files with 703 additions and 784 deletions

View File

@ -2,4 +2,7 @@ RPC_URL=http://solana-mainnet-api.rpc-node.com
DATABASE_URL=
SQLX_OFFLINE=true
MAX_PG_POOL_CONNS_WORKER=5
MAX_PG_POOL_CONNS_SERVER=15
MAX_PG_POOL_CONNS_SERVER=15
USE_SSL=false
CA_CERT_PATH=
CLIENT_KEY_PATH=

538
Cargo.lock generated
View File

@ -433,7 +433,7 @@ dependencies = [
"arrayref",
"base64 0.13.1",
"bincode",
"borsh 0.9.3",
"borsh",
"bytemuck",
"solana-program",
"thiserror",
@ -606,15 +606,6 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "atoi"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e"
dependencies = [
"num-traits",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -841,41 +832,18 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15bf3650200d8bffa99015595e10f1fbd17de07abbc25bb067da79e769939bfa"
dependencies = [
"borsh-derive 0.9.3",
"borsh-derive",
"hashbrown 0.11.2",
]
[[package]]
name = "borsh"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40f9ca3698b2e4cb7c15571db0abc5551dca417a21ae8140460b50309bb2cc62"
dependencies = [
"borsh-derive 0.10.2",
"hashbrown 0.12.3",
]
[[package]]
name = "borsh-derive"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6441c552f230375d18e3cc377677914d2ca2b0d36e52129fe15450a2dce46775"
dependencies = [
"borsh-derive-internal 0.9.3",
"borsh-schema-derive-internal 0.9.3",
"proc-macro-crate 0.1.5",
"proc-macro2 1.0.56",
"syn 1.0.107",
]
[[package]]
name = "borsh-derive"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "598b3eacc6db9c3ee57b22707ad8f6a8d2f6d442bfe24ffeb8cbb70ca59e6a35"
dependencies = [
"borsh-derive-internal 0.10.2",
"borsh-schema-derive-internal 0.10.2",
"borsh-derive-internal",
"borsh-schema-derive-internal",
"proc-macro-crate 0.1.5",
"proc-macro2 1.0.56",
"syn 1.0.107",
@ -892,17 +860,6 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "borsh-derive-internal"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186b734fa1c9f6743e90c95d7233c9faab6360d1a96d4ffa19d9cfd1e9350f8a"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.107",
]
[[package]]
name = "borsh-schema-derive-internal"
version = "0.9.3"
@ -914,17 +871,6 @@ dependencies = [
"syn 1.0.107",
]
[[package]]
name = "borsh-schema-derive-internal"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99b7ff1008316626f485991b960ade129253d4034014616b94f309a15366cc49"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.107",
]
[[package]]
name = "brotli"
version = "3.3.4"
@ -984,28 +930,6 @@ dependencies = [
"serde",
]
[[package]]
name = "bytecheck"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13fe11640a23eb24562225322cd3e452b93a3d4091d62fab69c70542fcd17d1f"
dependencies = [
"bytecheck_derive",
"ptr_meta",
"simdutf8",
]
[[package]]
name = "bytecheck_derive"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e31225543cb46f81a7e224762764f4a6a0f097b1db0b175f69e8065efaa42de5"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.107",
]
[[package]]
name = "bytemuck"
version = "1.13.1"
@ -1344,21 +1268,6 @@ dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -1402,16 +1311,6 @@ dependencies = [
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils 0.8.14",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
@ -1542,6 +1441,40 @@ version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23d8666cb01533c39dde32bcbab8e227b4ed6679b2c925eba05feabea39508fb"
[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"tokio",
]
[[package]]
name = "deadpool-postgres"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836a24a9d49deefe610b8b60c767a7412e9a931d79a89415cd2d2d71630ca8d7"
dependencies = [
"deadpool",
"log 0.4.17",
"tokio",
"tokio-postgres",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaa37046cc0f6c3cc6090fbdbf73ef0b8ef4cfcc37f6befc0020f63e8cf121e1"
dependencies = [
"tokio",
]
[[package]]
name = "default-env"
version = "0.1.1"
@ -1636,15 +1569,6 @@ dependencies = [
"walkdir",
]
[[package]]
name = "dirs"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-next"
version = "2.0.0"
@ -1655,17 +1579,6 @@ dependencies = [
"dirs-sys-next",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users",
"winapi 0.3.9",
]
[[package]]
name = "dirs-sys-next"
version = "0.1.2"
@ -1717,12 +1630,6 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "dotenvy"
version = "0.15.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03d8c417d7a8cb362e0c37e5d815f5eb7c37f79ff93707329d5a194e42e54ca0"
[[package]]
name = "eager"
version = "0.1.0"
@ -1769,9 +1676,6 @@ name = "either"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
dependencies = [
"serde",
]
[[package]]
name = "encode_unicode"
@ -1893,6 +1797,12 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fast-math"
version = "0.1.1"
@ -2062,17 +1972,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "futures-intrusive"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a604f7a68fbf8103337523b1fadc8ade7361ee3f112f7c680ad179651616aed5"
dependencies = [
"futures-core",
"lock_api 0.4.9",
"parking_lot 0.11.2",
]
[[package]]
name = "futures-io"
version = "0.3.27"
@ -2272,15 +2171,6 @@ dependencies = [
"ahash",
]
[[package]]
name = "hashlink"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69fe1fcf8b4278d860ad0548329f892a3631fb63f82574df68275f34cdbe0ffa"
dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "headers"
version = "0.3.8"
@ -2320,9 +2210,6 @@ name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
@ -2348,27 +2235,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "histogram"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669"
[[package]]
name = "hkdf"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437"
dependencies = [
"hmac 0.12.1",
]
[[package]]
name = "hmac"
version = "0.8.1"
@ -3518,17 +3390,20 @@ dependencies = [
"anyhow",
"arrayref",
"async-trait",
"borsh 0.9.3",
"borsh",
"bytemuck",
"chrono",
"deadpool-postgres",
"derive_more",
"dotenv",
"env_logger 0.10.0",
"futures 0.3.27",
"jsonrpc-core-client",
"log 0.4.17",
"native-tls",
"num-traits",
"num_enum 0.6.1",
"postgres-native-tls",
"serde",
"serde_derive",
"serde_json",
@ -3539,9 +3414,9 @@ dependencies = [
"solana-sdk",
"solana-transaction-status",
"spl-token",
"sqlx",
"strum",
"tokio",
"tokio-postgres",
"tokio-stream",
]
@ -3783,6 +3658,24 @@ dependencies = [
"indexmap",
]
[[package]]
name = "phf"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "928c6535de93548188ef63bb7c4036bd415cd8f36ad25af44b9789b2ee72a48c"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1fb5f6f826b772a8d4c0394209441e7d37cbbb967ae9c7e0e8134365c9ee676"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.0.12"
@ -3850,6 +3743,49 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d442770e2b1e244bb5eb03b31c79b65bb2568f413b899eaba850fa945a65954"
dependencies = [
"futures 0.3.27",
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d"
dependencies = [
"base64 0.21.0",
"byteorder",
"bytes 1.3.0",
"fallible-iterator",
"hmac 0.12.1",
"md-5",
"memchr",
"rand 0.8.5",
"sha2 0.10.6",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6"
dependencies = [
"bytes 1.3.0",
"chrono",
"fallible-iterator",
"postgres-protocol",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -4005,26 +3941,6 @@ dependencies = [
"autotools",
]
[[package]]
name = "ptr_meta"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1"
dependencies = [
"ptr_meta_derive",
]
[[package]]
name = "ptr_meta_derive"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.107",
]
[[package]]
name = "qstring"
version = "0.7.2"
@ -4401,15 +4317,6 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "rend"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "581008d2099240d37fb08d77ad713bcaec2c4d89d50b5b21a8bb1996bbab68ab"
dependencies = [
"bytecheck",
]
[[package]]
name = "reqwest"
version = "0.11.14"
@ -4454,6 +4361,12 @@ dependencies = [
"winreg",
]
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]]
name = "ring"
version = "0.16.20"
@ -4469,31 +4382,6 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "rkyv"
version = "0.7.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c30f1d45d9aa61cbc8cd1eb87705470892289bb2d01943e7803b873a57404dc3"
dependencies = [
"bytecheck",
"hashbrown 0.12.3",
"ptr_meta",
"rend",
"rkyv_derive",
"seahash",
]
[[package]]
name = "rkyv_derive"
version = "0.7.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff26ed6c7c4dfc2aa9480b86a60e3c7233543a270a680e10758a507c5a4ce476"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 1.0.107",
]
[[package]]
name = "rocksdb"
version = "0.19.0"
@ -4516,24 +4404,6 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "rust_decimal"
version = "1.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13cf35f7140155d02ba4ec3294373d513a3c7baa8364c162b030e33c61520a8"
dependencies = [
"arrayvec",
"borsh 0.10.2",
"bytecheck",
"byteorder",
"bytes 1.3.0",
"num-traits",
"rand 0.8.5",
"rkyv",
"serde",
"serde_json",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
@ -4722,12 +4592,6 @@ dependencies = [
"untrusted",
]
[[package]]
name = "seahash"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
[[package]]
name = "security-framework"
version = "2.8.0"
@ -5003,18 +4867,18 @@ version = "1.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
[[package]]
name = "simdutf8"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "simpl"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711"
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sized-chunks"
version = "0.6.5"
@ -5623,8 +5487,8 @@ dependencies = [
"bincode",
"bitflags",
"blake3",
"borsh 0.9.3",
"borsh-derive 0.9.3",
"borsh",
"borsh-derive",
"bs58 0.4.0",
"bv",
"bytemuck",
@ -5842,7 +5706,7 @@ dependencies = [
"base64 0.13.1",
"bincode",
"bitflags",
"borsh 0.9.3",
"borsh",
"bs58 0.4.0",
"bytemuck",
"byteorder",
@ -6046,7 +5910,7 @@ dependencies = [
"Inflector",
"base64 0.13.1",
"bincode",
"borsh 0.9.3",
"borsh",
"bs58 0.4.0",
"lazy_static",
"log 0.4.17",
@ -6196,7 +6060,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbc000f0fdf1f12f99d77d398137c1751345b18c88258ce0f99b7872cf6c9bd6"
dependencies = [
"assert_matches",
"borsh 0.9.3",
"borsh",
"num-derive",
"num-traits",
"solana-program",
@ -6247,114 +6111,6 @@ dependencies = [
"thiserror",
]
[[package]]
name = "sqlformat"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
dependencies = [
"itertools",
"nom",
"unicode_categories",
]
[[package]]
name = "sqlx"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9249290c05928352f71c077cc44a464d880c63f26f7534728cca008e135c0428"
dependencies = [
"sqlx-core",
"sqlx-macros",
]
[[package]]
name = "sqlx-core"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcbc16ddba161afc99e14d1713a453747a2b07fc097d2009f4c300ec99286105"
dependencies = [
"ahash",
"atoi",
"base64 0.13.1",
"bitflags",
"byteorder",
"bytes 1.3.0",
"chrono",
"crc",
"crossbeam-queue",
"dirs",
"dotenvy",
"either",
"event-listener",
"futures-channel",
"futures-core",
"futures-intrusive",
"futures-util",
"hashlink",
"hex",
"hkdf",
"hmac 0.12.1",
"indexmap",
"itoa",
"libc",
"log 0.4.17",
"md-5",
"memchr",
"num-bigint 0.4.3",
"once_cell",
"paste",
"percent-encoding 2.2.0",
"rand 0.8.5",
"rust_decimal",
"serde",
"serde_json",
"sha1 0.10.5",
"sha2 0.10.6",
"smallvec 1.10.0",
"sqlformat",
"sqlx-rt",
"stringprep",
"thiserror",
"tokio-stream",
"url 2.3.1",
"whoami",
]
[[package]]
name = "sqlx-macros"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b850fa514dc11f2ee85be9d055c512aa866746adfacd1cb42d867d68e6a5b0d9"
dependencies = [
"dotenvy",
"either",
"heck 0.4.0",
"hex",
"once_cell",
"proc-macro2 1.0.56",
"quote 1.0.26",
"serde",
"serde_json",
"sha2 0.10.6",
"sqlx-core",
"sqlx-rt",
"syn 1.0.107",
"url 2.3.1",
]
[[package]]
name = "sqlx-rt"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24c5b2d25fa654cc5f841750b8e1cdedbe21189bf9a9382ee90bfa9dd3562396"
dependencies = [
"native-tls",
"once_cell",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
@ -6716,6 +6472,30 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29a12c1b3e0704ae7dfc25562629798b29c72e6b1d0a681b6f29ab4ae5e7f7bf"
dependencies = [
"async-trait",
"byteorder",
"bytes 1.3.0",
"fallible-iterator",
"futures-channel",
"futures-util",
"log 0.4.17",
"parking_lot 0.12.1",
"percent-encoding 2.2.0",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"socket2",
"tokio",
"tokio-util 0.7.2",
]
[[package]]
name = "tokio-reactor"
version = "0.1.12"
@ -7108,12 +6888,6 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "unicode_categories"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "universal-hash"
version = "0.4.1"
@ -7421,16 +7195,6 @@ dependencies = [
"once_cell",
]
[[package]]
name = "whoami"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45dbc71f0cdca27dc261a9bd37ddec174e4a0af2b900b890f378460f745426e3"
dependencies = [
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.2.8"

View File

@ -26,7 +26,10 @@ futures = "0.3.27"
jsonrpc-core-client = { version = "18.0.0", features = ["ws", "http"] }
sqlx = { version = "0.6", features = [ "runtime-tokio-native-tls" , "postgres", "chrono", "decimal", "offline" ] }
deadpool-postgres = { version = "0.10.5", features = [ "rt_tokio_1" ] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
postgres-native-tls = "0.5.0"
native-tls = "0.2.11"
chrono = "0.4.23"
solana-client = "=1.14.13"

View File

@ -1,50 +1,54 @@
use chrono::{DateTime, Utc};
use sqlx::{pool::PoolConnection, Postgres};
use crate::{
structs::{
candle::Candle,
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
},
utils::AnyhowWrap,
use crate::structs::{
candle::Candle,
coingecko::{PgCoinGecko24HighLow, PgCoinGecko24HourVolume},
openbook::PgOpenBookFill,
resolution::Resolution,
trader::PgTrader,
};
use chrono::{DateTime, Utc};
use deadpool_postgres::{GenericClient, Pool};
pub async fn fetch_earliest_fill(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
) -> anyhow::Result<Option<PgOpenBookFill>> {
sqlx::query_as!(
PgOpenBookFill,
r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#,
market_address_string
)
.fetch_optional(conn)
.await
.map_err_anyhow()
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
native_qty_paid as "native_qty_paid!",
native_qty_received as "native_qty_received!",
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
where market = $1
and maker = true
ORDER BY time asc LIMIT 1"#,
)
.await?;
let row = client.query_opt(&stmt, &[&market_address_string]).await?;
match row {
Some(r) => Ok(Some(PgOpenBookFill::from_row(r))),
None => Ok(None),
}
}
pub async fn fetch_fills_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgOpenBookFill>> {
sqlx::query_as!(
PgOpenBookFill,
r#"SELECT
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
time as "time!",
bid as "bid!",
maker as "maker!",
@ -53,31 +57,36 @@ pub async fn fetch_fills_from(
native_fee_or_rebate as "native_fee_or_rebate!"
from fills
where market = $1
and time >= $2
and time < $3
and time >= $2::timestamptz
and time < $3::timestamptz
and maker = true
ORDER BY time asc"#,
market_address_string,
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
)
.await?;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows
.into_iter()
.map(|r| PgOpenBookFill::from_row(r))
.collect())
}
pub async fn fetch_latest_finished_candle(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Option<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -89,26 +98,33 @@ pub async fn fetch_latest_finished_candle(
and resolution = $2
and complete = true
ORDER BY start_time desc LIMIT 1"#,
market_name,
resolution.to_string()
)
.fetch_optional(conn)
.await
.map_err_anyhow()
)
.await?;
let row = client
.query_opt(&stmt, &[&market_name, &resolution.to_string()])
.await?;
match row {
Some(r) => Ok(Some(Candle::from_row(r))),
None => Ok(None),
}
}
pub async fn fetch_earliest_candles(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -119,64 +135,32 @@ pub async fn fetch_earliest_candles(
where market_name = $1
and resolution = $2
ORDER BY start_time asc"#,
market_name,
resolution.to_string()
)
.fetch_all(conn)
.await
.map_err_anyhow()
)
.await?;
let rows = client
.query(&stmt, &[&market_name, &resolution.to_string()])
.await?;
Ok(rows.into_iter().map(|r| Candle::from_row(r)).collect())
}
pub async fn fetch_candles_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
low as "low!",
volume as "volume!",
complete as "complete!"
from candles
where market_name = $1
and resolution = $2
and start_time >= $3
and end_time <= $4
ORDER BY start_time asc"#,
market_name,
resolution.to_string(),
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
}
let client = pool.get().await?;
pub async fn fetch_tradingview_candles(
conn: &mut PoolConnection<Postgres>,
market_name: &str,
resolution: Resolution,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<Candle>> {
sqlx::query_as!(
Candle,
r#"SELECT
let stmt = client
.prepare(
r#"SELECT
market_name as "market_name!",
start_time as "start_time!",
end_time as "end_time!",
resolution as "resolution!",
market_name as "market_name!",
open as "open!",
close as "close!",
high as "high!",
@ -189,112 +173,135 @@ pub async fn fetch_tradingview_candles(
and start_time >= $3
and end_time <= $4
ORDER BY start_time asc"#,
market_name,
resolution.to_string(),
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
)
.await?;
let rows = client
.query(
&stmt,
&[
&market_name,
&resolution.to_string(),
&start_time,
&end_time,
],
)
.await?;
Ok(rows.into_iter().map(|r| Candle::from_row(r)).collect())
}
pub async fn fetch_top_traders_by_base_volume_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!(
PgTrader,
r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
market_address_string,
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
open_orders_owner,
sum(
native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_paid * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_received * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
)
.await?;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(|r| PgTrader::from_row(r)).collect())
}
pub async fn fetch_top_traders_by_quote_volume_from(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_address_string: &str,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> anyhow::Result<Vec<PgTrader>> {
sqlx::query_as!(
PgTrader,
r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
market_address_string,
start_time,
end_time
)
.fetch_all(conn)
.await
.map_err_anyhow()
let client = pool.get().await?;
let stmt = client
.prepare(
r#"SELECT
open_orders_owner,
sum(
native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END
) as "raw_ask_size!",
sum(
native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END
) as "raw_bid_size!"
FROM fills
WHERE market = $1
AND time >= $2
AND time < $3
GROUP BY open_orders_owner
ORDER BY
sum(native_qty_received * CASE bid WHEN true THEN 0 WHEN false THEN 1 END)
+
sum(native_qty_paid * CASE bid WHEN true THEN 1 WHEN false THEN 0 END)
DESC
LIMIT 10000"#,
)
.await?;
let rows = client
.query(&stmt, &[&market_address_string, &start_time, &end_time])
.await?;
Ok(rows.into_iter().map(|r| PgTrader::from_row(r)).collect())
}
pub async fn fetch_coingecko_24h_volume(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
) -> anyhow::Result<Vec<PgCoinGecko24HourVolume>> {
sqlx::query_as!(
PgCoinGecko24HourVolume,
r#"select market as "address!",
let client = pool.get().await?;
let stmt = client
.prepare(
r#"select market as "address!",
sum(native_qty_paid) as "raw_quote_size!",
sum(native_qty_received) as "raw_base_size!"
from fills
where "time" >= current_timestamp - interval '1 day'
and bid = true
group by market"#
)
.fetch_all(conn)
.await
.map_err_anyhow()
group by market"#,
)
.await?;
let rows = client.query(&stmt, &[]).await?;
Ok(rows
.into_iter()
.map(|r| PgCoinGecko24HourVolume::from_row(r))
.collect())
}
pub async fn fetch_coingecko_24h_high_low(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
) -> anyhow::Result<Vec<PgCoinGecko24HighLow>> {
sqlx::query_as!(
PgCoinGecko24HighLow,
r#"select
let client = pool.get().await?;
let stmt = client
.prepare(
r#"select
g.market_name as "market_name!",
g.high as "high!",
g.low as "low!",
@ -317,9 +324,14 @@ pub async fn fetch_coingecko_24h_high_low(
join candles c on g.market_name = c.market_name
and g.start_time = c.start_time
where
c.resolution = '1M'"#
)
.fetch_all(conn)
.await
.map_err_anyhow()
c.resolution = '1M'"#,
)
.await?;
let rows = client.query(&stmt, &[]).await?;
Ok(rows
.into_iter()
.map(|r| PgCoinGecko24HighLow::from_row(r))
.collect())
}

View File

@ -1,24 +1,68 @@
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use std::time::Duration;
use std::{fs, time::Duration};
use crate::utils::{AnyhowWrap, Config};
use deadpool_postgres::{
Config as PgConfig, ManagerConfig, Pool, PoolConfig, RecyclingMethod, Runtime, SslMode,
Timeouts,
};
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool<Postgres>> {
loop {
let pool = PgPoolOptions::new()
.max_connections(config.max_pg_pool_connections)
.connect(&config.database_url)
.await;
if pool.is_ok() {
println!("Database connected");
return pool.map_err_anyhow();
use crate::utils::Config;
pub async fn connect_to_database(config: &Config) -> anyhow::Result<Pool> {
let mut x = PgConfig::new();
// TODO: fix
x.host = Some("".to_owned());
x.user = Some("".to_owned());
x.password = Some("".to_owned());
x.dbname = Some("postgres".to_owned());
x.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Fast,
});
x.pool = Some(PoolConfig {
max_size: config.max_pg_pool_connections,
timeouts: Timeouts::default(),
});
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
let tls = if config.use_ssl {
x.ssl_mode = Some(SslMode::Require);
let ca_cert = fs::read(&config.ca_cert_path).expect("reading client cert from file");
let client_key = fs::read(&config.client_key_path).expect("reading client key from file");
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(false)
.build()?,
)
} else {
MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(true)
.build()
.unwrap(),
)
};
let pool = x.create_pool(Some(Runtime::Tokio1), tls).unwrap();
match pool.get().await {
Ok(_) => println!("Database connected"),
Err(e) => {
println!("Failed to connect to database: {}, retrying", e.to_string());
tokio::time::sleep(Duration::from_millis(500)).await;
}
println!("Failed to connect to database, retrying");
tokio::time::sleep(Duration::from_millis(500)).await;
}
Ok(pool)
}
pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
pub async fn setup_database(pool: &Pool) -> anyhow::Result<()> {
let candles_table_fut = create_candles_table(pool);
let fills_table_fut = create_fills_table(pool);
let result = tokio::try_join!(candles_table_fut, fills_table_fut);
@ -34,50 +78,51 @@ pub async fn setup_database(pool: &Pool<Postgres>) -> anyhow::Result<()> {
}
}
pub async fn create_candles_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?;
pub async fn create_candles_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
sqlx::query!(
"CREATE TABLE IF NOT EXISTS candles (
client
.execute(
"CREATE TABLE IF NOT EXISTS candles (
id serial,
market_name text,
start_time timestamptz,
end_time timestamptz,
resolution text,
open numeric,
close numeric,
high numeric,
low numeric,
volume numeric,
open double precision,
close double precision,
high double precision,
low double precision,
volume double precision,
complete bool
)",
)
.execute(&mut tx)
.await?;
&[],
)
.await?;
sqlx::query!(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)"
).execute(&mut tx).await?;
client.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time_resolution ON candles (market_name, start_time, resolution)",
&[]
).await?;
sqlx::query!(
client.execute(
"DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_constraint WHERE conname = 'unique_candles') THEN
ALTER TABLE candles ADD CONSTRAINT unique_candles UNIQUE (market_name, start_time, resolution);
END IF;
END $$"
)
.execute(&mut tx)
.await?;
END $$", &[]
).await?;
tx.commit().await.map_err_anyhow()
Ok(())
}
pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
let mut tx = pool.begin().await.map_err_anyhow()?;
pub async fn create_fills_table(pool: &Pool) -> anyhow::Result<()> {
let client = pool.get().await?;
sqlx::query!(
"CREATE TABLE IF NOT EXISTS fills (
client
.execute(
"CREATE TABLE IF NOT EXISTS fills (
id numeric PRIMARY KEY,
time timestamptz not null,
market text not null,
@ -85,23 +130,28 @@ pub async fn create_fills_table(pool: &Pool<Postgres>) -> anyhow::Result<()> {
open_orders_owner text not null,
bid bool not null,
maker bool not null,
native_qty_paid numeric not null,
native_qty_received numeric not null,
native_fee_or_rebate numeric not null,
native_qty_paid double precision not null,
native_qty_received double precision not null,
native_fee_or_rebate double precision not null,
fee_tier text not null,
order_id text not null
)",
)
.execute(&mut tx)
.await?;
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)")
.execute(&mut tx)
&[],
)
.await?;
sqlx::query!("CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)")
.execute(&mut tx)
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_id_market ON fills (id, market)",
&[],
)
.await?;
tx.commit().await.map_err_anyhow()
client
.execute(
"CREATE INDEX IF NOT EXISTS idx_market_time ON fills (market, time)",
&[],
)
.await?;
Ok(())
}

View File

@ -1,5 +1,5 @@
use chrono::Utc;
use sqlx::{Connection, Pool, Postgres};
use deadpool_postgres::Pool;
use std::{
collections::{hash_map::DefaultHasher, HashMap},
hash::{Hash, Hasher},
@ -12,10 +12,10 @@ use crate::{
};
pub async fn persist_fill_events(
pool: &Pool<Postgres>,
pool: &Pool,
fill_receiver: &mut Receiver<OpenBookFillEventLog>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
let client = pool.get().await?;
loop {
let mut write_batch = HashMap::new();
while write_batch.len() < 10 {
@ -41,57 +41,57 @@ pub async fn persist_fill_events(
if write_batch.len() > 0 {
// print!("writing: {:?} events to DB\n", write_batch.len());
match conn.ping().await {
Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch);
sqlx::query(&upsert_statement)
.execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(_) => {
println!("Fills ping failed");
break;
}
}
// match conn.ping().await {
// Ok(_) => {
let upsert_statement = build_fills_upsert_statement(write_batch);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
// }
// Err(_) => {
// println!("Fills ping failed");
// break;
// }
// }
}
}
Ok(())
}
pub async fn persist_candles(
pool: Pool<Postgres>,
pool: Pool,
candles_receiver: &mut Receiver<Vec<Candle>>,
) -> anyhow::Result<()> {
let mut conn = pool.acquire().await.unwrap();
let client = pool.get().await.unwrap();
loop {
match conn.ping().await {
Ok(_) => {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
// print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles);
sqlx::query(&upsert_statement)
.execute(&mut conn)
.await
.map_err_anyhow()
.unwrap();
}
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
// match client.ping().await {
// Ok(_) => {
match candles_receiver.try_recv() {
Ok(candles) => {
if candles.len() == 0 {
continue;
}
// print!("writing: {:?} candles to DB\n", candles.len());
let upsert_statement = build_candes_upsert_statement(candles);
client
.execute(&upsert_statement, &[])
.await
.map_err_anyhow()
.unwrap();
}
Err(_) => {
println!("Candle ping failed");
break;
Err(TryRecvError::Empty) => continue,
Err(TryRecvError::Disconnected) => {
panic!("Candles sender must stay alive")
}
};
// }
// Err(_) => {
// println!("Candle ping failed");
// break;
// }
// };
}
Ok(())
}

View File

@ -1,5 +1,5 @@
use openbook_candles::{
database::fetch::fetch_tradingview_candles,
database::fetch::fetch_candles_from,
structs::{markets::valid_market, resolution::Resolution, tradingview::TvResponse},
utils::{to_timestampz, WebContext},
};
@ -34,9 +34,8 @@ pub async fn get_candles(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let candles =
match fetch_tradingview_candles(&mut conn, &info.market_name, resolution, from, to).await {
match fetch_candles_from(&context.pool, &info.market_name, resolution, from, to).await {
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};

View File

@ -49,17 +49,14 @@ pub async fn pairs(context: web::Data<WebContext>) -> Result<HttpResponse, Serve
#[get("/tickers")]
pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, ServerError> {
let client = RpcClient::new(context.rpc_url.clone());
// let client = RpcClient::new(context.rpc_url.clone());
let markets = &context.markets;
let mut c1 = context.pool.acquire().await.unwrap();
let mut c2 = context.pool.acquire().await.unwrap();
// let bba_fut = get_best_bids_and_asks(client, markets);
let volume_fut = fetch_coingecko_24h_volume(&mut c1);
let high_low_fut = fetch_coingecko_24h_high_low(&mut c2);
let volume_fut = fetch_coingecko_24h_volume(&context.pool);
let high_low_fut = fetch_coingecko_24h_high_low(&context.pool);
let (volume_query, high_low_quey) =
join!(volume_fut, high_low_fut,);
let (volume_query, high_low_quey) = join!(volume_fut, high_low_fut,);
let raw_volumes = match volume_query {
Ok(c) => c,
@ -105,7 +102,7 @@ pub async fn tickers(context: web::Data<WebContext>) -> Result<HttpResponse, Ser
Ok(HttpResponse::Ok().json(tickers))
}
#[get("/orderbook")]
#[get("/orderbook")] // TODO: implement an optional geyser version
pub async fn orderbook(
info: web::Query<OrderBookParams>,
context: web::Data<WebContext>,

View File

@ -30,15 +30,21 @@ async fn main() -> std::io::Result<()> {
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_SERVER")
let use_ssl: bool = dotenv::var("USE_SSL").unwrap().parse::<bool>().unwrap();
let ca_cert_path: String = dotenv::var("CA_CERT_PATH").unwrap();
let client_key_path: String = dotenv::var("CLIENT_KEY_PATH").unwrap();
let max_pg_pool_connections: usize = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.parse::<usize>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
database_url: database_url.clone(),
database_url,
max_pg_pool_connections,
use_ssl,
ca_cert_path,
client_key_path,
};
let markets = load_markets(path_to_markets_json);

View File

@ -31,14 +31,17 @@ pub async fn get_top_traders_by_base_volume(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let raw_traders =
match fetch_top_traders_by_base_volume_from(&mut conn, &selected_market.address, from, to)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let raw_traders = match fetch_top_traders_by_base_volume_from(
&context.pool,
&selected_market.address,
from,
to,
)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders
.into_iter()
@ -67,14 +70,17 @@ pub async fn get_top_traders_by_quote_volume(
let from = to_timestampz(info.from);
let to = to_timestampz(info.to);
let mut conn = context.pool.acquire().await.unwrap();
let raw_traders =
match fetch_top_traders_by_quote_volume_from(&mut conn, &selected_market.address, from, to)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let raw_traders = match fetch_top_traders_by_quote_volume_from(
&context.pool,
&selected_market.address,
from,
to,
)
.await
{
Ok(c) => c,
Err(_) => return Err(ServerError::DbQueryError),
};
let traders = raw_traders
.into_iter()

View File

@ -1,6 +1,5 @@
use chrono::{DateTime, NaiveDateTime, Utc};
use num_traits::Zero;
use sqlx::types::Decimal;
use tokio_postgres::Row;
use super::resolution::Resolution;
@ -10,11 +9,11 @@ pub struct Candle {
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub resolution: String,
pub open: Decimal,
pub close: Decimal,
pub high: Decimal,
pub low: Decimal,
pub volume: Decimal,
pub open: f64,
pub close: f64,
pub high: f64,
pub low: f64,
pub volume: f64,
pub complete: bool,
}
@ -25,12 +24,27 @@ impl Candle {
start_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
end_time: DateTime::from_utc(NaiveDateTime::MIN, Utc),
resolution: resolution.to_string(),
open: Decimal::zero(),
close: Decimal::zero(),
high: Decimal::zero(),
low: Decimal::zero(),
volume: Decimal::zero(),
open: 0.0,
close: 0.0,
high: 0.0,
low: 0.0,
volume: 0.0,
complete: false,
}
}
pub fn from_row(row: Row) -> Self {
Candle {
market_name: row.get(0),
start_time: row.get(1),
end_time: row.get(2),
resolution: row.get(3),
open: row.get(4),
close: row.get(5),
high: row.get(6),
low: row.get(7),
volume: row.get(8),
complete: row.get(9),
}
}
}

View File

@ -1,5 +1,5 @@
use serde::Serialize;
use sqlx::types::Decimal;
use tokio_postgres::Row;
use super::{markets::MarketInfo, openbook::token_factor};
@ -35,8 +35,8 @@ pub struct CoinGeckoTicker {
pub struct PgCoinGecko24HourVolume {
pub address: String,
pub raw_base_size: Decimal,
pub raw_quote_size: Decimal,
pub raw_base_size: f64,
pub raw_quote_size: f64,
}
impl PgCoinGecko24HourVolume {
pub fn convert_to_readable(&self, markets: &Vec<MarketInfo>) -> CoinGecko24HourVolume {
@ -49,19 +49,38 @@ impl PgCoinGecko24HourVolume {
target_volume,
}
}
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HourVolume {
address: row.get(0),
raw_base_size: row.get(1),
raw_quote_size: row.get(2),
}
}
}
#[derive(Debug, Default)]
pub struct CoinGecko24HourVolume {
pub market_name: String,
pub base_volume: Decimal,
pub target_volume: Decimal,
pub base_volume: f64,
pub target_volume: f64,
}
#[derive(Debug, Default)]
pub struct PgCoinGecko24HighLow {
pub market_name: String,
pub high: Decimal,
pub low: Decimal,
pub close: Decimal,
pub high: f64,
pub low: f64,
pub close: f64,
}
impl PgCoinGecko24HighLow {
pub fn from_row(row: Row) -> Self {
PgCoinGecko24HighLow {
market_name: row.get(0),
high: row.get(1),
low: row.get(2),
close: row.get(3),
}
}
}

View File

@ -1,8 +1,8 @@
use anchor_lang::{event, AnchorDeserialize, AnchorSerialize};
use chrono::{DateTime, Utc};
use num_traits::FromPrimitive;
use num_traits::Pow;
use solana_sdk::pubkey::Pubkey;
use sqlx::types::Decimal;
use tokio_postgres::Row;
#[event]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@ -27,9 +27,21 @@ pub struct PgOpenBookFill {
pub time: DateTime<Utc>,
pub bid: bool,
pub maker: bool,
pub native_qty_paid: Decimal,
pub native_qty_received: Decimal,
pub native_fee_or_rebate: Decimal,
pub native_qty_paid: f64,
pub native_qty_received: f64,
pub native_fee_or_rebate: f64,
}
impl PgOpenBookFill {
pub fn from_row(row: Row) -> Self {
PgOpenBookFill {
time: row.get(0),
bid: row.get(1),
maker: row.get(2),
native_qty_paid: row.get(3),
native_qty_received: row.get(4),
native_fee_or_rebate: row.get(5),
}
}
}
#[derive(Copy, Clone, AnchorDeserialize)]
@ -91,7 +103,7 @@ pub fn calculate_fill_price_and_size(
fill: PgOpenBookFill,
base_decimals: u8,
quote_decimals: u8,
) -> (Decimal, Decimal) {
) -> (f64, f64) {
if fill.bid {
let price_before_fees = if fill.maker {
fill.native_qty_paid + fill.native_fee_or_rebate
@ -115,6 +127,6 @@ pub fn calculate_fill_price_and_size(
}
}
pub fn token_factor(decimals: u8) -> Decimal {
Decimal::from_u64(10u64.pow(decimals as u32)).unwrap()
pub fn token_factor(decimals: u8) -> f64 {
10f64.pow(decimals as f64)
}

View File

@ -5,7 +5,6 @@ use futures::join;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use num_traits::ToPrimitive;
use solana_client::nonblocking::rpc_client::RpcClient;
use sqlx::types::Decimal;
use std::{
convert::TryFrom,
mem::{align_of, size_of},
@ -102,19 +101,19 @@ impl LeafNode {
NonZeroU64::new((self.key >> 64) as u64).unwrap()
}
pub fn readable_price(&self, market: &MarketInfo) -> Decimal {
let price_lots = Decimal::from((self.key >> 64) as u64);
pub fn readable_price(&self, market: &MarketInfo) -> f64 {
let price_lots = (self.key >> 64) as f64;
let base_multiplier = token_factor(market.base_decimals);
let quote_multiplier = token_factor(market.quote_decimals);
let base_lot_size = Decimal::from(market.base_lot_size);
let quote_lot_size = Decimal::from(market.quote_lot_size);
let base_lot_size = market.base_lot_size as f64;
let quote_lot_size = market.quote_lot_size as f64;
(price_lots * quote_lot_size * base_multiplier) / (base_lot_size * quote_multiplier)
}
pub fn readable_quantity(&self, market: &MarketInfo) -> Decimal {
let base_lot_size = Decimal::from(market.base_lot_size);
pub fn readable_quantity(&self, market: &MarketInfo) -> f64 {
let base_lot_size = market.base_lot_size as f64;
let base_multiplier = token_factor(market.base_decimals);
Decimal::from(self.quantity) * base_lot_size / base_multiplier
self.quantity as f64 * base_lot_size / base_multiplier
}
#[inline]
@ -406,7 +405,7 @@ impl Slab {
}
}
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> Decimal {
pub fn get_best(&self, market: &MarketInfo, bid: bool) -> f64 {
let min = if bid {
self.find_max()
} else {
@ -419,7 +418,7 @@ impl Slab {
pub async fn get_best_bids_and_asks(
client: RpcClient,
markets: &Vec<MarketInfo>,
) -> (Vec<Decimal>, Vec<Decimal>) {
) -> (Vec<f64>, Vec<f64>) {
let bid_keys = markets
.iter()
.map(|m| Pubkey::from_str(&m.bids_key).unwrap())

View File

@ -2,15 +2,24 @@ use std::fmt;
use num_traits::ToPrimitive;
use serde::Serialize;
use sqlx::types::Decimal;
use tokio_postgres::Row;
use super::openbook::token_factor;
#[derive(Clone, Debug, PartialEq)]
pub struct PgTrader {
pub open_orders_owner: String,
pub raw_ask_size: Decimal,
pub raw_bid_size: Decimal,
pub raw_ask_size: f64,
pub raw_bid_size: f64,
}
impl PgTrader {
pub fn from_row(row: Row) -> Self {
PgTrader {
open_orders_owner: row.get(0),
raw_ask_size: row.get(1),
raw_bid_size: row.get(2),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]

View File

@ -1,6 +1,6 @@
use chrono::{NaiveDateTime, Utc};
use deadpool_postgres::Pool;
use serde_derive::Deserialize;
use sqlx::{Pool, Postgres};
use crate::structs::markets::MarketInfo;
@ -20,16 +20,35 @@ impl<T, E: std::fmt::Debug> AnyhowWrap for Result<T, E> {
pub struct Config {
pub rpc_url: String,
pub database_url: String,
pub max_pg_pool_connections: u32,
pub max_pg_pool_connections: usize,
pub use_ssl: bool,
pub ca_cert_path: String,
pub client_key_path: String,
}
pub struct WebContext {
pub rpc_url: String,
pub markets: Vec<MarketInfo>,
pub pool: Pool<Postgres>,
pub pool: Pool,
}
#[allow(deprecated)]
pub fn to_timestampz(seconds: u64) -> chrono::DateTime<Utc> {
chrono::DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(seconds as i64, 0), Utc)
}
pub(crate) fn f64_max(a: f64, b: f64) -> f64 {
if a >= b {
a
} else {
b
}
}
pub(crate) fn f64_min(a: f64, b: f64) -> f64 {
if a < b {
a
} else {
b
}
}

View File

@ -1,6 +1,6 @@
use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{pool::PoolConnection, Postgres};
use std::cmp::{max, min};
use deadpool_postgres::Pool;
use std::cmp::max;
use crate::{
database::fetch::{fetch_candles_from, fetch_earliest_candles, fetch_latest_finished_candle},
@ -8,21 +8,22 @@ use crate::{
candle::Candle,
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
};
pub async fn batch_higher_order_candles(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
market_name: &str,
resolution: Resolution,
) -> anyhow::Result<Vec<Candle>> {
let latest_candle = fetch_latest_finished_candle(conn, market_name, resolution).await?;
let latest_candle = fetch_latest_finished_candle(pool, market_name, resolution).await?;
match latest_candle {
Some(candle) => {
let start_time = candle.end_time;
let end_time = start_time + day();
let mut constituent_candles = fetch_candles_from(
conn,
pool,
market_name,
resolution.get_constituent_resolution(),
start_time,
@ -42,7 +43,7 @@ pub async fn batch_higher_order_candles(
}
None => {
let mut constituent_candles =
fetch_earliest_candles(conn, market_name, resolution.get_constituent_resolution())
fetch_earliest_candles(pool, market_name, resolution.get_constituent_resolution())
.await?;
if constituent_candles.len() == 0 {
// println!(
@ -112,8 +113,8 @@ fn combine_into_higher_order_candles(
while matches!(con_iter.peek(), Some(c) if c.end_time <= end_time) {
let unit_candle = con_iter.next().unwrap();
combined_candles[i].high = max(combined_candles[i].high, unit_candle.high);
combined_candles[i].low = min(combined_candles[i].low, unit_candle.low);
combined_candles[i].high = f64_max(combined_candles[i].high, unit_candle.high);
combined_candles[i].low = f64_min(combined_candles[i].low, unit_candle.low);
combined_candles[i].close = unit_candle.close;
combined_candles[i].volume += unit_candle.volume;
combined_candles[i].complete = unit_candle.complete;

View File

@ -1,7 +1,7 @@
use std::cmp::{max, min};
use std::cmp::min;
use chrono::{DateTime, Duration, DurationRound, Utc};
use sqlx::{pool::PoolConnection, types::Decimal, Postgres};
use deadpool_postgres::Pool;
use crate::{
database::fetch::{fetch_earliest_fill, fetch_fills_from, fetch_latest_finished_candle},
@ -11,15 +11,13 @@ use crate::{
openbook::{calculate_fill_price_and_size, PgOpenBookFill},
resolution::{day, Resolution},
},
utils::{f64_max, f64_min},
};
pub async fn batch_1m_candles(
conn: &mut PoolConnection<Postgres>,
market: &MarketInfo,
) -> anyhow::Result<Vec<Candle>> {
pub async fn batch_1m_candles(pool: &Pool, market: &MarketInfo) -> anyhow::Result<Vec<Candle>> {
let market_name = &market.name;
let market_address = &market.address;
let latest_candle = fetch_latest_finished_candle(conn, market_name, Resolution::R1m).await?;
let latest_candle = fetch_latest_finished_candle(pool, market_name, Resolution::R1m).await?;
match latest_candle {
Some(candle) => {
@ -28,7 +26,7 @@ pub async fn batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
let candles = combine_fills_into_1m_candles(
&mut fills,
market,
@ -39,7 +37,7 @@ pub async fn batch_1m_candles(
Ok(candles)
}
None => {
let earliest_fill = fetch_earliest_fill(conn, market_address).await?;
let earliest_fill = fetch_earliest_fill(pool, market_address).await?;
if earliest_fill.is_none() {
println!("No fills found for: {:?}", market_name);
@ -54,7 +52,7 @@ pub async fn batch_1m_candles(
start_time + day(),
Utc::now().duration_trunc(Duration::minutes(1))?,
);
let mut fills = fetch_fills_from(conn, market_address, start_time, end_time).await?;
let mut fills = fetch_fills_from(pool, market_address, start_time, end_time).await?;
if fills.len() > 0 {
let candles =
combine_fills_into_1m_candles(&mut fills, market, start_time, end_time, None);
@ -71,7 +69,7 @@ fn combine_fills_into_1m_candles(
market: &MarketInfo,
st: DateTime<Utc>,
et: DateTime<Utc>,
maybe_last_price: Option<Decimal>,
maybe_last_price: Option<f64>,
) -> Vec<Candle> {
let empty_candle = Candle::create_empty_candle(market.name.clone(), Resolution::R1m);
@ -105,8 +103,8 @@ fn combine_fills_into_1m_candles(
calculate_fill_price_and_size(*fill, market.base_decimals, market.quote_decimals);
candles[i].close = price;
candles[i].low = min(price, candles[i].low);
candles[i].high = max(price, candles[i].high);
candles[i].low = f64_min(price, candles[i].low);
candles[i].high = f64_max(price, candles[i].high);
candles[i].volume += volume;
last_price = price;

View File

@ -2,7 +2,7 @@ pub mod higher_order_candles;
pub mod minute_candles;
use chrono::Duration;
use sqlx::{pool::PoolConnection, Pool, Postgres};
use deadpool_postgres::Pool;
use strum::IntoEnumIterator;
use tokio::{sync::mpsc::Sender, time::sleep};
@ -14,17 +14,17 @@ use crate::{
use self::higher_order_candles::batch_higher_order_candles;
pub async fn batch_for_market(
pool: Pool<Postgres>,
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
loop {
let sender = candles_sender.clone();
let market_clone = market.clone();
let mut conn = pool.acquire().await?;
// let client = pool.get().await?;
loop {
sleep(Duration::milliseconds(2000).to_std()?).await;
match batch_inner(&mut conn, &sender, &market_clone).await {
match batch_inner(pool, &sender, &market_clone).await {
Ok(_) => {}
Err(e) => {
println!(
@ -41,19 +41,19 @@ pub async fn batch_for_market(
}
async fn batch_inner(
conn: &mut PoolConnection<Postgres>,
pool: &Pool,
candles_sender: &Sender<Vec<Candle>>,
market: &MarketInfo,
) -> anyhow::Result<()> {
let market_name = &market.name.clone();
let candles = batch_1m_candles(conn, market).await?;
let candles = batch_1m_candles(pool, market).await?;
send_candles(candles, candles_sender).await;
for resolution in Resolution::iter() {
if resolution == Resolution::R1m {
continue;
}
let candles = batch_higher_order_candles(conn, market_name, resolution).await?;
let candles = batch_higher_order_candles(pool, market_name, resolution).await?;
send_candles(candles, candles_sender).await;
}
Ok(())

View File

@ -1,14 +1,16 @@
use dotenv;
use openbook_candles::database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
};
use openbook_candles::structs::candle::Candle;
use openbook_candles::structs::markets::{fetch_market_infos, load_markets};
use openbook_candles::structs::openbook::OpenBookFillEventLog;
use openbook_candles::utils::Config;
use openbook_candles::worker::candle_batching::batch_for_market;
use openbook_candles::worker::trade_fetching::scrape::scrape;
use openbook_candles::{
database::{
initialize::{connect_to_database, setup_database},
insert::{persist_candles, persist_fill_events},
},
worker::candle_batching::batch_for_market,
};
use solana_sdk::pubkey::Pubkey;
use std::env;
use std::{collections::HashMap, str::FromStr};
@ -23,15 +25,21 @@ async fn main() -> anyhow::Result<()> {
let path_to_markets_json = &args[1];
let rpc_url: String = dotenv::var("RPC_URL").unwrap();
let database_url: String = dotenv::var("DATABASE_URL").unwrap();
let max_pg_pool_connections: u32 = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
let use_ssl: bool = dotenv::var("USE_SSL").unwrap().parse::<bool>().unwrap();
let ca_cert_path: String = dotenv::var("CA_CERT_PATH").unwrap();
let client_key_path: String = dotenv::var("CLIENT_KEY_PATH").unwrap();
let max_pg_pool_connections: usize = dotenv::var("MAX_PG_POOL_CONNS_WORKER")
.unwrap()
.parse::<u32>()
.parse::<usize>()
.unwrap();
let config = Config {
rpc_url: rpc_url.clone(),
database_url,
max_pg_pool_connections,
use_ssl,
ca_cert_path,
client_key_path,
};
let markets = load_markets(&path_to_markets_json);
@ -49,7 +57,7 @@ async fn main() -> anyhow::Result<()> {
let (fill_sender, mut fill_receiver) = mpsc::channel::<OpenBookFillEventLog>(1000);
handles.push(tokio::spawn(async move {
scrape(&config, &fill_sender, &target_markets).await; //TODO: send the vec, it's okay
scrape(&config, &fill_sender, &target_markets).await;
}));
let fills_pool = pool.clone();
@ -67,7 +75,7 @@ async fn main() -> anyhow::Result<()> {
let sender = candle_sender.clone();
let batch_pool = pool.clone();
handles.push(tokio::spawn(async move {
batch_for_market(batch_pool, &sender, &market)
batch_for_market(&batch_pool, &sender, &market)
.await
.unwrap();
println!("SOMETHING WENT WRONG");