diff --git a/Cargo.lock b/Cargo.lock index 53e7bd5..e7090aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4292,6 +4292,8 @@ dependencies = [ "async-stream 0.2.1", "async-trait", "bs58 0.3.1", + "bytes 1.1.0", + "fixed", "futures 0.3.17", "futures-core", "futures-util", @@ -4323,7 +4325,6 @@ dependencies = [ "anyhow", "async-trait", "bs58 0.3.1", - "bytes 1.1.0", "fixed", "log 0.4.14", "mango", diff --git a/connector-mango/Cargo.toml b/connector-mango/Cargo.toml index 8461be6..aa6f862 100644 --- a/connector-mango/Cargo.toml +++ b/connector-mango/Cargo.toml @@ -18,7 +18,6 @@ tokio = { version = "1", features = ["full"] } tokio-postgres = "0.7.4" postgres-types = { version = "0.2", features = ["array-impls", "derive"] } postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" } -bytes = "1.0" # The v3.2 branch currently has a conflicting syn version dependency, use a fixed version mango = { git = "https://github.com/ckamm/mango-v3", rev = "2e1e4886544bdb1c5f773a3856305210f622fa94" } diff --git a/connector-mango/src/mango.rs b/connector-mango/src/mango.rs index 41a1299..a5b2bcc 100644 --- a/connector-mango/src/mango.rs +++ b/connector-mango/src/mango.rs @@ -1,230 +1,12 @@ use { async_trait::async_trait, - bytes::{BufMut, BytesMut}, - fixed::types::I80F48, mango::state::{DataType, MangoAccount, MangoCache, MangoGroup}, mango_common::Loadable, - postgres_types::{IsNull, ToSql, Type}, - std::{cmp, error, mem}, + postgres_types::ToSql, + std::mem, }; -use crate::{encode_address, AccountTable, AccountWrite}; - -#[derive(Debug, Clone)] -pub struct SqlNumericI80F48(pub I80F48); - -impl ToSql for SqlNumericI80F48 { - fn to_sql( - &self, - _: &postgres_types::Type, - out: &mut BytesMut, - ) -> Result> { - if self.0 == 0 { - out.reserve(10); - out.put_u16(1); // num groups - out.put_i16(0); // first group weight - out.put_u16(0); // sign - out.put_u16(0); // dscale - out.put_i16(0); // first group - return Ok(IsNull::No); - } - - let abs_val = self.0.abs(); - let decimals = abs_val.int_log10(); - let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16; - let last_group_weight = -4i16; - let num_groups = (first_group_weight - last_group_weight + 1) as usize; - - // Reserve bytes - out.reserve(8 + num_groups * 2); - - // Number of groups - out.put_u16(num_groups as u16); - // Weight of first group - out.put_i16(first_group_weight); - // Sign - out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 }); - // DScale - out.put_u16(16); - - let mut int_part = abs_val.int().to_num::(); - let mut frac_part = (abs_val.frac() * I80F48::from_num(1e16)).to_num::(); - - //info!("i80f48 {} {} {} {} {}", self.0, decimals, first_group_weight, int_part, frac_part); - - for weight in (0..=first_group_weight).rev() { - let decimal_shift = 10000u128.pow(weight as u32); - let v = (int_part / decimal_shift) & 0xFFFF; - out.put_i16(v as i16); - //info!("int {} {} {}", weight, v, int_part); - int_part -= v * decimal_shift; - } - for weight in (last_group_weight..=cmp::min(first_group_weight, -1)).rev() { - let decimal_shift = 10000u64.pow((4 + weight) as u32); - let v = (frac_part / decimal_shift) & 0xFFFF; - out.put_i16(v as i16); - //info!("frac {} {} {}", weight, v, frac_part); - frac_part -= v * decimal_shift; - } - - Ok(IsNull::No) - } - - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::NUMERIC) - } - - postgres_types::to_sql_checked!(); -} - -// from https://github.com/rust-lang/rust/pull/86930 -mod int_log { - // 0 < val < 100_000_000 - const fn less_than_8(mut val: u32) -> u32 { - let mut log = 0; - if val >= 10_000 { - val /= 10_000; - log += 4; - } - log + if val >= 1000 { - 3 - } else if val >= 100 { - 2 - } else if val >= 10 { - 1 - } else { - 0 - } - } - - // 0 < val < 10_000_000_000_000_000 - const fn less_than_16(mut val: u64) -> u32 { - let mut log = 0; - if val >= 100_000_000 { - val /= 100_000_000; - log += 8; - } - log + less_than_8(val as u32) - } - - // 0 < val <= u64::MAX - pub const fn u64(mut val: u64) -> u32 { - let mut log = 0; - if val >= 10_000_000_000_000_000 { - val /= 10_000_000_000_000_000; - log += 16; - } - log + less_than_16(val) - } - - // 0 < val <= u128::MAX - pub const fn u128(mut val: u128) -> u32 { - let mut log = 0; - if val >= 100_000_000_000_000_000_000_000_000_000_000 { - val /= 100_000_000_000_000_000_000_000_000_000_000; - log += 32; - return log + less_than_8(val as u32); - } - if val >= 10_000_000_000_000_000 { - val /= 10_000_000_000_000_000; - log += 16; - } - log + less_than_16(val as u64) - } -} - -#[derive(Debug, Clone)] -pub struct SqlNumericI128(pub i128); - -impl ToSql for SqlNumericI128 { - fn to_sql( - &self, - _: &postgres_types::Type, - out: &mut BytesMut, - ) -> Result> { - let abs_val = self.0.abs() as u128; - let decimals = if self.0 != 0 { - int_log::u128(abs_val) - } else { - 0 - }; - let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16; - let num_groups = (first_group_weight + 1) as usize; - - // Reserve bytes - out.reserve(8 + num_groups * 2); - - // Number of groups - out.put_u16(num_groups as u16); - // Weight of first group - out.put_i16(first_group_weight); - // Sign - out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 }); - // DScale - out.put_u16(0); - - let mut int_part = abs_val; - - for weight in (0..=first_group_weight).rev() { - let decimal_shift = 10000u128.pow(weight as u32); - let v = (int_part / decimal_shift) & 0xFFFF; - out.put_i16(v as i16); - int_part -= v * decimal_shift; - } - - Ok(IsNull::No) - } - - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::NUMERIC) - } - - postgres_types::to_sql_checked!(); -} - -#[derive(Debug, Clone)] -pub struct SqlNumericU64(pub u64); - -impl ToSql for SqlNumericU64 { - fn to_sql( - &self, - _: &postgres_types::Type, - out: &mut BytesMut, - ) -> Result> { - let decimals = if self.0 != 0 { int_log::u64(self.0) } else { 0 }; - let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16; - let num_groups = (first_group_weight + 1) as usize; - - // Reserve bytes - out.reserve(8 + num_groups * 2); - - // Number of groups - out.put_u16(num_groups as u16); - // Weight of first group - out.put_i16(first_group_weight); - // Sign - out.put_u16(0); - // DScale - out.put_u16(0); - - let mut int_part = self.0; - - for weight in (0..=first_group_weight).rev() { - let decimal_shift = 10000u64.pow(weight as u32); - let v = (int_part / decimal_shift) & 0xFFFF; - out.put_i16(v as i16); - int_part -= v * decimal_shift; - } - - Ok(IsNull::No) - } - - fn accepts(ty: &Type) -> bool { - matches!(*ty, Type::NUMERIC) - } - - postgres_types::to_sql_checked!(); -} +use crate::{encode_address, postgres_types_numeric::*, AccountTable, AccountWrite}; #[derive(Debug, ToSql)] struct PerpAccount { diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 32fc604..4f8234f 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -35,6 +35,8 @@ bs58 = "0.3.1" log = "0.4" rand = "0.7" anyhow = "1.0" +fixed = { version = "=1.9.0", features = ["serde"] } +bytes = "1.0" futures = "0.3.17" futures-core = "0.3" diff --git a/lib/src/lib.rs b/lib/src/lib.rs index a8625ff..af9e65b 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,6 +1,7 @@ pub mod grpc_plugin_source; pub mod metrics; pub mod postgres_target; +pub mod postgres_types_numeric; pub mod websocket_source; use { diff --git a/lib/src/postgres_types_numeric.rs b/lib/src/postgres_types_numeric.rs new file mode 100644 index 0000000..c9ef94f --- /dev/null +++ b/lib/src/postgres_types_numeric.rs @@ -0,0 +1,222 @@ +use { + bytes::{BufMut, BytesMut}, + fixed::types::I80F48, + postgres_types::{IsNull, ToSql, Type}, + std::{cmp, error}, +}; + +#[derive(Debug, Clone)] +pub struct SqlNumericI80F48(pub I80F48); + +impl ToSql for SqlNumericI80F48 { + fn to_sql( + &self, + _: &postgres_types::Type, + out: &mut BytesMut, + ) -> Result> { + if self.0 == 0 { + out.reserve(10); + out.put_u16(1); // num groups + out.put_i16(0); // first group weight + out.put_u16(0); // sign + out.put_u16(0); // dscale + out.put_i16(0); // first group + return Ok(IsNull::No); + } + + let abs_val = self.0.abs(); + let decimals = abs_val.int_log10(); + let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16; + let last_group_weight = -4i16; + let num_groups = (first_group_weight - last_group_weight + 1) as usize; + + // Reserve bytes + out.reserve(8 + num_groups * 2); + + // Number of groups + out.put_u16(num_groups as u16); + // Weight of first group + out.put_i16(first_group_weight); + // Sign + out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 }); + // DScale + out.put_u16(16); + + let mut int_part = abs_val.int().to_num::(); + let mut frac_part = (abs_val.frac() * I80F48::from_num(1e16)).to_num::(); + + //info!("i80f48 {} {} {} {} {}", self.0, decimals, first_group_weight, int_part, frac_part); + + for weight in (0..=first_group_weight).rev() { + let decimal_shift = 10000u128.pow(weight as u32); + let v = (int_part / decimal_shift) & 0xFFFF; + out.put_i16(v as i16); + //info!("int {} {} {}", weight, v, int_part); + int_part -= v * decimal_shift; + } + for weight in (last_group_weight..=cmp::min(first_group_weight, -1)).rev() { + let decimal_shift = 10000u64.pow((4 + weight) as u32); + let v = (frac_part / decimal_shift) & 0xFFFF; + out.put_i16(v as i16); + //info!("frac {} {} {}", weight, v, frac_part); + frac_part -= v * decimal_shift; + } + + Ok(IsNull::No) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::NUMERIC) + } + + postgres_types::to_sql_checked!(); +} + +// from https://github.com/rust-lang/rust/pull/86930 +mod int_log { + // 0 < val < 100_000_000 + const fn less_than_8(mut val: u32) -> u32 { + let mut log = 0; + if val >= 10_000 { + val /= 10_000; + log += 4; + } + log + if val >= 1000 { + 3 + } else if val >= 100 { + 2 + } else if val >= 10 { + 1 + } else { + 0 + } + } + + // 0 < val < 10_000_000_000_000_000 + const fn less_than_16(mut val: u64) -> u32 { + let mut log = 0; + if val >= 100_000_000 { + val /= 100_000_000; + log += 8; + } + log + less_than_8(val as u32) + } + + // 0 < val <= u64::MAX + pub const fn u64(mut val: u64) -> u32 { + let mut log = 0; + if val >= 10_000_000_000_000_000 { + val /= 10_000_000_000_000_000; + log += 16; + } + log + less_than_16(val) + } + + // 0 < val <= u128::MAX + pub const fn u128(mut val: u128) -> u32 { + let mut log = 0; + if val >= 100_000_000_000_000_000_000_000_000_000_000 { + val /= 100_000_000_000_000_000_000_000_000_000_000; + log += 32; + return log + less_than_8(val as u32); + } + if val >= 10_000_000_000_000_000 { + val /= 10_000_000_000_000_000; + log += 16; + } + log + less_than_16(val as u64) + } +} + +#[derive(Debug, Clone)] +pub struct SqlNumericI128(pub i128); + +impl ToSql for SqlNumericI128 { + fn to_sql( + &self, + _: &postgres_types::Type, + out: &mut BytesMut, + ) -> Result> { + let abs_val = self.0.abs() as u128; + let decimals = if self.0 != 0 { + int_log::u128(abs_val) + } else { + 0 + }; + let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16; + let num_groups = (first_group_weight + 1) as usize; + + // Reserve bytes + out.reserve(8 + num_groups * 2); + + // Number of groups + out.put_u16(num_groups as u16); + // Weight of first group + out.put_i16(first_group_weight); + // Sign + out.put_u16(if self.0 < 0 { 0x4000 } else { 0x0000 }); + // DScale + out.put_u16(0); + + let mut int_part = abs_val; + + for weight in (0..=first_group_weight).rev() { + let decimal_shift = 10000u128.pow(weight as u32); + let v = (int_part / decimal_shift) & 0xFFFF; + out.put_i16(v as i16); + int_part -= v * decimal_shift; + } + + Ok(IsNull::No) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::NUMERIC) + } + + postgres_types::to_sql_checked!(); +} + +#[derive(Debug, Clone)] +pub struct SqlNumericU64(pub u64); + +impl ToSql for SqlNumericU64 { + fn to_sql( + &self, + _: &postgres_types::Type, + out: &mut BytesMut, + ) -> Result> { + let decimals = if self.0 != 0 { int_log::u64(self.0) } else { 0 }; + let first_group_weight = ((decimals as f64) / 4.0f64).floor() as i16; + let num_groups = (first_group_weight + 1) as usize; + + // Reserve bytes + out.reserve(8 + num_groups * 2); + + // Number of groups + out.put_u16(num_groups as u16); + // Weight of first group + out.put_i16(first_group_weight); + // Sign + out.put_u16(0); + // DScale + out.put_u16(0); + + let mut int_part = self.0; + + for weight in (0..=first_group_weight).rev() { + let decimal_shift = 10000u64.pow(weight as u32); + let v = (int_part / decimal_shift) & 0xFFFF; + out.put_i16(v as i16); + int_part -= v * decimal_shift; + } + + Ok(IsNull::No) + } + + fn accepts(ty: &Type) -> bool { + matches!(*ty, Type::NUMERIC) + } + + postgres_types::to_sql_checked!(); +}