From d5eaa385daf4ecf1ed80e8f9ac043032357cd375 Mon Sep 17 00:00:00 2001 From: Geoff Taylor Date: Tue, 17 Aug 2021 12:39:55 +0100 Subject: [PATCH] Removed rx.core.typing for many classes - using the actual base classes now. --- bin/liquidator | 1 - mango/healthcheck.py | 2 +- mango/observables.py | 12 ++++++------ mango/oracle.py | 2 +- mango/oracles/ftx/ftx.py | 2 +- mango/oracles/pythnetwork/pythnetwork.py | 2 +- mango/oracles/serum/serum.py | 2 +- mango/oracles/stub/stub.py | 2 +- mango/reconnectingwebsocket.py | 1 - 9 files changed, 12 insertions(+), 14 deletions(-) diff --git a/bin/liquidator b/bin/liquidator index 8094cff..a3d4c7f 100755 --- a/bin/liquidator +++ b/bin/liquidator @@ -5,7 +5,6 @@ import logging import os import os.path import rx -import rx.core.typing import rx.operators as ops import sys import threading diff --git a/mango/healthcheck.py b/mango/healthcheck.py index db9c35b..bf2f8f6 100644 --- a/mango/healthcheck.py +++ b/mango/healthcheck.py @@ -30,7 +30,7 @@ class HealthCheck(rx.core.typing.Disposable): self.healthcheck_files_location: str = healthcheck_files_location self._to_dispose: typing.List[rx.core.typing.Disposable] = [] - def add(self, name: str, observable: rx.core.typing.Observable[typing.Any]): + def add(self, name: str, observable: rx.core.Observable): healthcheck_file_touch_disposer = observable.subscribe( on_next=lambda _: Path(f"{self.healthcheck_files_location}/mango_healthcheck_{name}").touch(mode=0o666, exist_ok=True)) self._to_dispose += [healthcheck_file_touch_disposer] diff --git a/mango/observables.py b/mango/observables.py index fb38764..18fc093 100644 --- a/mango/observables.py +++ b/mango/observables.py @@ -36,7 +36,7 @@ from rxpy_backpressure import BackPressure # -class NullObserverSubscriber(rx.core.typing.Observer): +class NullObserverSubscriber(rx.core.Observer): def __init__(self) -> None: super().__init__() @@ -54,7 +54,7 @@ class NullObserverSubscriber(rx.core.typing.Observer): # # This class can subscribe to an `Observable` and print out each item. # -class PrintingObserverSubscriber(rx.core.typing.Observer): +class PrintingObserverSubscriber(rx.core.Observer): def __init__(self, report_no_output: bool) -> None: super().__init__() self.report_no_output = report_no_output @@ -90,7 +90,7 @@ class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber): # # This class can subscribe to an `Observable` and collect each item. # -class CollectingObserverSubscriber(rx.core.typing.Observer): +class CollectingObserverSubscriber(rx.core.Observer): def __init__(self) -> None: self.logger: logging.Logger = logging.getLogger(self.__class__.__name__) self.collected: typing.List[typing.Any] = [] @@ -134,7 +134,7 @@ TItem = typing.TypeVar('TItem') # # This class can subscribe to an `Observable` and capture the latest item as it is observed. # -class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem]): +class LatestItemObserverSubscriber(rx.core.Observer, typing.Generic[TItem]): def __init__(self, initial: TItem) -> None: super().__init__() self.latest: TItem = initial @@ -159,7 +159,7 @@ class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem # This is mostly for libraries (like `rxpy_backpressure`) that take observers but not their # component functions. # -class FunctionObserver(rx.core.typing.Observer): +class FunctionObserver(rx.core.Observer): def __init__(self, on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, @@ -196,7 +196,7 @@ class FunctionObserver(rx.core.typing.Observer): # take multiple seconds to complete. In that case, the latest item will be immediately # emitted and the in-between items skipped. # -def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.typing.Observer: +def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.Observer: observer = FunctionObserver(on_next=on_next, on_error=on_error, on_completed=on_completed) return BackPressure.LATEST(observer) diff --git a/mango/oracle.py b/mango/oracle.py index c4adff1..12c59db 100644 --- a/mango/oracle.py +++ b/mango/oracle.py @@ -114,7 +114,7 @@ class Oracle(metaclass=abc.ABCMeta): raise NotImplementedError("Oracle.fetch_price() is not implemented on the base type.") @abc.abstractmethod - def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable: + def to_streaming_observable(self, context: Context) -> rx.core.Observable: raise NotImplementedError("Oracle.fetch_price() is not implemented on the base type.") diff --git a/mango/oracles/ftx/ftx.py b/mango/oracles/ftx/ftx.py index 10c9f5e..502961b 100644 --- a/mango/oracles/ftx/ftx.py +++ b/mango/oracles/ftx/ftx.py @@ -74,7 +74,7 @@ class FtxOracle(Oracle): return Price(self.source, datetime.now(), self.market, bid, price, ask, FtxOracleConfidence) - def to_streaming_observable(self, _: Context) -> rx.core.typing.Observable: + def to_streaming_observable(self, _: Context) -> rx.core.Observable: subject = Subject() def _on_item(data): diff --git a/mango/oracles/pythnetwork/pythnetwork.py b/mango/oracles/pythnetwork/pythnetwork.py index b8f97b9..e25f3fd 100644 --- a/mango/oracles/pythnetwork/pythnetwork.py +++ b/mango/oracles/pythnetwork/pythnetwork.py @@ -90,7 +90,7 @@ class PythOracle(Oracle): # Pyth has no notion of bids, asks, or spreads so just provide the single price. return Price(self.source, datetime.now(), self.market, price, price, price, confidence) - def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable: + def to_streaming_observable(self, context: Context) -> rx.core.Observable: return rx.interval(1).pipe( ops.observe_on(context.pool_scheduler), ops.start_with(-1), diff --git a/mango/oracles/serum/serum.py b/mango/oracles/serum/serum.py index 8668a3d..f871a8b 100644 --- a/mango/oracles/serum/serum.py +++ b/mango/oracles/serum/serum.py @@ -87,7 +87,7 @@ class SerumOracle(Oracle): return Price(self.source, datetime.now(), self.market, top_bid, mid_price, top_ask, SerumOracleConfidence) - def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable: + def to_streaming_observable(self, context: Context) -> rx.core.Observable: return rx.interval(1).pipe( ops.observe_on(context.pool_scheduler), ops.start_with(-1), diff --git a/mango/oracles/stub/stub.py b/mango/oracles/stub/stub.py index 6395ee4..62f169f 100644 --- a/mango/oracles/stub/stub.py +++ b/mango/oracles/stub/stub.py @@ -75,7 +75,7 @@ class StubOracle(Oracle): # will give you the consistent results, but you'll need to adjust your code" return Price(self.source, datetime.now(), self.market, raw_price.price, raw_price.price, raw_price.price, StubOracleConfidence) - def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable: + def to_streaming_observable(self, context: Context) -> rx.core.Observable: return rx.interval(1).pipe( ops.observe_on(context.pool_scheduler), ops.start_with(-1), diff --git a/mango/reconnectingwebsocket.py b/mango/reconnectingwebsocket.py index 9e3d290..ba6219f 100644 --- a/mango/reconnectingwebsocket.py +++ b/mango/reconnectingwebsocket.py @@ -18,7 +18,6 @@ import json import logging import rx import rx.subject -import rx.core.typing import typing import websocket # type: ignore