Removed rx.core.typing for many classes - using the actual base classes now.
This commit is contained in:
parent
a4ca18e5ed
commit
d5eaa385da
|
@ -5,7 +5,6 @@ import logging
|
|||
import os
|
||||
import os.path
|
||||
import rx
|
||||
import rx.core.typing
|
||||
import rx.operators as ops
|
||||
import sys
|
||||
import threading
|
||||
|
|
|
@ -30,7 +30,7 @@ class HealthCheck(rx.core.typing.Disposable):
|
|||
self.healthcheck_files_location: str = healthcheck_files_location
|
||||
self._to_dispose: typing.List[rx.core.typing.Disposable] = []
|
||||
|
||||
def add(self, name: str, observable: rx.core.typing.Observable[typing.Any]):
|
||||
def add(self, name: str, observable: rx.core.Observable):
|
||||
healthcheck_file_touch_disposer = observable.subscribe(
|
||||
on_next=lambda _: Path(f"{self.healthcheck_files_location}/mango_healthcheck_{name}").touch(mode=0o666, exist_ok=True))
|
||||
self._to_dispose += [healthcheck_file_touch_disposer]
|
||||
|
|
|
@ -36,7 +36,7 @@ from rxpy_backpressure import BackPressure
|
|||
#
|
||||
|
||||
|
||||
class NullObserverSubscriber(rx.core.typing.Observer):
|
||||
class NullObserverSubscriber(rx.core.Observer):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
|
||||
|
@ -54,7 +54,7 @@ class NullObserverSubscriber(rx.core.typing.Observer):
|
|||
#
|
||||
# This class can subscribe to an `Observable` and print out each item.
|
||||
#
|
||||
class PrintingObserverSubscriber(rx.core.typing.Observer):
|
||||
class PrintingObserverSubscriber(rx.core.Observer):
|
||||
def __init__(self, report_no_output: bool) -> None:
|
||||
super().__init__()
|
||||
self.report_no_output = report_no_output
|
||||
|
@ -90,7 +90,7 @@ class TimestampedPrintingObserverSubscriber(PrintingObserverSubscriber):
|
|||
#
|
||||
# This class can subscribe to an `Observable` and collect each item.
|
||||
#
|
||||
class CollectingObserverSubscriber(rx.core.typing.Observer):
|
||||
class CollectingObserverSubscriber(rx.core.Observer):
|
||||
def __init__(self) -> None:
|
||||
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
||||
self.collected: typing.List[typing.Any] = []
|
||||
|
@ -134,7 +134,7 @@ TItem = typing.TypeVar('TItem')
|
|||
#
|
||||
# This class can subscribe to an `Observable` and capture the latest item as it is observed.
|
||||
#
|
||||
class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem]):
|
||||
class LatestItemObserverSubscriber(rx.core.Observer, typing.Generic[TItem]):
|
||||
def __init__(self, initial: TItem) -> None:
|
||||
super().__init__()
|
||||
self.latest: TItem = initial
|
||||
|
@ -159,7 +159,7 @@ class LatestItemObserverSubscriber(rx.core.typing.Observer, typing.Generic[TItem
|
|||
# This is mostly for libraries (like `rxpy_backpressure`) that take observers but not their
|
||||
# component functions.
|
||||
#
|
||||
class FunctionObserver(rx.core.typing.Observer):
|
||||
class FunctionObserver(rx.core.Observer):
|
||||
def __init__(self,
|
||||
on_next: typing.Callable[[typing.Any], None],
|
||||
on_error: typing.Callable[[Exception], None] = lambda _: None,
|
||||
|
@ -196,7 +196,7 @@ class FunctionObserver(rx.core.typing.Observer):
|
|||
# take multiple seconds to complete. In that case, the latest item will be immediately
|
||||
# emitted and the in-between items skipped.
|
||||
#
|
||||
def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.typing.Observer:
|
||||
def create_backpressure_skipping_observer(on_next: typing.Callable[[typing.Any], None], on_error: typing.Callable[[Exception], None] = lambda _: None, on_completed: typing.Callable[[], None] = lambda: None) -> rx.core.Observer:
|
||||
observer = FunctionObserver(on_next=on_next, on_error=on_error, on_completed=on_completed)
|
||||
return BackPressure.LATEST(observer)
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ class Oracle(metaclass=abc.ABCMeta):
|
|||
raise NotImplementedError("Oracle.fetch_price() is not implemented on the base type.")
|
||||
|
||||
@abc.abstractmethod
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.Observable:
|
||||
raise NotImplementedError("Oracle.fetch_price() is not implemented on the base type.")
|
||||
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ class FtxOracle(Oracle):
|
|||
|
||||
return Price(self.source, datetime.now(), self.market, bid, price, ask, FtxOracleConfidence)
|
||||
|
||||
def to_streaming_observable(self, _: Context) -> rx.core.typing.Observable:
|
||||
def to_streaming_observable(self, _: Context) -> rx.core.Observable:
|
||||
subject = Subject()
|
||||
|
||||
def _on_item(data):
|
||||
|
|
|
@ -90,7 +90,7 @@ class PythOracle(Oracle):
|
|||
# Pyth has no notion of bids, asks, or spreads so just provide the single price.
|
||||
return Price(self.source, datetime.now(), self.market, price, price, price, confidence)
|
||||
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.Observable:
|
||||
return rx.interval(1).pipe(
|
||||
ops.observe_on(context.pool_scheduler),
|
||||
ops.start_with(-1),
|
||||
|
|
|
@ -87,7 +87,7 @@ class SerumOracle(Oracle):
|
|||
|
||||
return Price(self.source, datetime.now(), self.market, top_bid, mid_price, top_ask, SerumOracleConfidence)
|
||||
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.Observable:
|
||||
return rx.interval(1).pipe(
|
||||
ops.observe_on(context.pool_scheduler),
|
||||
ops.start_with(-1),
|
||||
|
|
|
@ -75,7 +75,7 @@ class StubOracle(Oracle):
|
|||
# will give you the consistent results, but you'll need to adjust your code"
|
||||
return Price(self.source, datetime.now(), self.market, raw_price.price, raw_price.price, raw_price.price, StubOracleConfidence)
|
||||
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
||||
def to_streaming_observable(self, context: Context) -> rx.core.Observable:
|
||||
return rx.interval(1).pipe(
|
||||
ops.observe_on(context.pool_scheduler),
|
||||
ops.start_with(-1),
|
||||
|
|
|
@ -18,7 +18,6 @@ import json
|
|||
import logging
|
||||
import rx
|
||||
import rx.subject
|
||||
import rx.core.typing
|
||||
import typing
|
||||
import websocket # type: ignore
|
||||
|
||||
|
|
Loading…
Reference in New Issue