Changed RX subscribe_on to observe_on.
This commit is contained in:
parent
7bbe476b2e
commit
d7f117175f
|
@ -63,7 +63,7 @@ def start_subscriptions(context: mango.Context, liquidation_processor: mango.Liq
|
||||||
|
|
||||||
logging.info("Starting margin account fetcher subscription")
|
logging.info("Starting margin account fetcher subscription")
|
||||||
account_subscription = rx.interval(float(throttle_reload_to_seconds)).pipe(
|
account_subscription = rx.interval(float(throttle_reload_to_seconds)).pipe(
|
||||||
ops.subscribe_on(context.pool_scheduler),
|
ops.observe_on(context.pool_scheduler),
|
||||||
ops.start_with(-1),
|
ops.start_with(-1),
|
||||||
ops.map(fetch_accounts(context)),
|
ops.map(fetch_accounts(context)),
|
||||||
ops.catch(mango.observable_pipeline_error_reporter),
|
ops.catch(mango.observable_pipeline_error_reporter),
|
||||||
|
@ -72,7 +72,7 @@ def start_subscriptions(context: mango.Context, liquidation_processor: mango.Liq
|
||||||
|
|
||||||
logging.info("Starting price fetcher subscription")
|
logging.info("Starting price fetcher subscription")
|
||||||
price_subscription = rx.interval(float(throttle_ripe_update_to_seconds)).pipe(
|
price_subscription = rx.interval(float(throttle_ripe_update_to_seconds)).pipe(
|
||||||
ops.subscribe_on(context.pool_scheduler),
|
ops.observe_on(context.pool_scheduler),
|
||||||
ops.map(fetch_prices(context)),
|
ops.map(fetch_prices(context)),
|
||||||
ops.catch(mango.observable_pipeline_error_reporter),
|
ops.catch(mango.observable_pipeline_error_reporter),
|
||||||
ops.retry()
|
ops.retry()
|
||||||
|
|
|
@ -92,7 +92,7 @@ class PythOracle(Oracle):
|
||||||
|
|
||||||
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
||||||
return rx.interval(1).pipe(
|
return rx.interval(1).pipe(
|
||||||
ops.subscribe_on(context.pool_scheduler),
|
ops.observe_on(context.pool_scheduler),
|
||||||
ops.start_with(-1),
|
ops.start_with(-1),
|
||||||
ops.map(lambda _: self.fetch_price(context)),
|
ops.map(lambda _: self.fetch_price(context)),
|
||||||
ops.catch(observable_pipeline_error_reporter),
|
ops.catch(observable_pipeline_error_reporter),
|
||||||
|
|
|
@ -87,7 +87,7 @@ class SerumOracle(Oracle):
|
||||||
|
|
||||||
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
def to_streaming_observable(self, context: Context) -> rx.core.typing.Observable:
|
||||||
return rx.interval(1).pipe(
|
return rx.interval(1).pipe(
|
||||||
ops.subscribe_on(context.pool_scheduler),
|
ops.observe_on(context.pool_scheduler),
|
||||||
ops.start_with(-1),
|
ops.start_with(-1),
|
||||||
ops.map(lambda _: self.fetch_price(context)),
|
ops.map(lambda _: self.fetch_price(context)),
|
||||||
ops.catch(observable_pipeline_error_reporter),
|
ops.catch(observable_pipeline_error_reporter),
|
||||||
|
|
Loading…
Reference in New Issue