Moved opening of websocket to be entirely within WebSocketSubscriptionManager.

This commit is contained in:
Geoff Taylor 2021-08-21 13:25:28 +01:00
parent 1696ead1e3
commit 153ebae2f4
7 changed files with 36 additions and 54 deletions

View File

@ -47,7 +47,7 @@ group = mango.Group.load(context, context.group_id)
account = mango.Account.load_for_owner_by_index(context, wallet.address, group, args.account_index)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
manager = mango.WebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
@ -114,12 +114,7 @@ def hedge(event: mango.PerpFillEvent) -> None:
publisher.subscribe(on_next=hedge)
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
health_check.add("ws_pong", ws.pong)
ws.open()
manager.open()
logging.info(f"Current assets in account {account.address} (owner: {account.owner}):")
mango.TokenValue.report([asset for asset in account.net_assets if asset is not None], logging.info)
@ -132,6 +127,5 @@ except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -25,7 +25,7 @@ logging.warning(mango.WARNING_DISCLAIMER_TEXT)
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
manager = mango.WebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
log_subscription = mango.WebSocketLogSubscription(context, args.address)
@ -34,11 +34,7 @@ publisher = log_subscription.publisher
publisher.subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.client.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws.open()
manager.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
@ -48,6 +44,5 @@ except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -67,7 +67,7 @@ context = mango.ContextBuilder.from_command_line_parameters(args)
logging.info(f"{context}")
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
manager = mango.WebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
@ -112,11 +112,7 @@ elif isinstance(market, mango.PerpMarket):
else:
raise Exception(f"Could not determine type of market {market.symbol}")
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws.open()
manager.open()
order_reconciler = mango.marketmaking.ToleranceOrderReconciler(
args.existing_order_tolerance, args.existing_order_tolerance)
@ -149,7 +145,6 @@ except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
cleanup(context, wallet, account, market, args.dry_run)
logging.info("Shutdown complete.")

View File

@ -34,7 +34,7 @@ logging.warning(mango.WARNING_DISCLAIMER_TEXT)
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
manager = mango.WebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
if args.account_type.upper() == "ACCOUNTINFO":
@ -69,11 +69,7 @@ else:
publisher.subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
ws.open()
manager.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
@ -83,6 +79,5 @@ except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -38,7 +38,7 @@ context.client.commitment = Max
print(context)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
manager = mango.WebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
@ -63,12 +63,7 @@ publisher.pipe(
rx.operators.retry()
).subscribe(mango.PrintingObserverSubscriber(False))
websocket_url = context.client.cluster_url.replace("https", "wss", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
health_check.add("ws_pong", ws.pong)
ws.open()
manager.open()
# Wait - don't exit. Exiting will be handled by signals/interrupts.
waiter = threading.Event()
@ -78,6 +73,5 @@ except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -103,7 +103,7 @@ def add_subscription_for_parameter(context: mango.Context, manager: mango.WebSoc
context = mango.ContextBuilder.from_command_line_parameters(args)
disposer = mango.DisposePropagator()
manager = mango.WebSocketSubscriptionManager(context.name)
manager = mango.WebSocketSubscriptionManager(context)
disposer.add_disposable(manager)
health_check = mango.HealthCheck()
disposer.add_disposable(health_check)
@ -111,16 +111,7 @@ disposer.add_disposable(health_check)
for name_and_address in args.named_address:
add_subscription_for_parameter(context, manager, args.timer_limit, name_and_address)
websocket_url = context.client.cluster_url.replace("https", "ws", 1)
ws: mango.ReconnectingWebsocket = mango.ReconnectingWebsocket(websocket_url, manager.open_handler)
ws.item.subscribe(on_next=manager.on_item)
ws.ping_interval = 10
health_check.add("ws_pong", ws.pong)
ws_connecting_log_disposable = ws.connecting.subscribe(
on_next=lambda _: send_event_notification("Re-connecting websocket"))
disposer.add_disposable(ws_connecting_log_disposable)
ws.open()
manager.open()
send_event_notification(f"Starting using context:\n{context}")
@ -132,6 +123,5 @@ except:
pass
logging.info("Shutting down...")
ws.close()
disposer.dispose()
logging.info("Shutdown complete.")

View File

@ -25,6 +25,7 @@ from solana.rpc.types import RPCResponse
from .accountinfo import AccountInfo
from .context import Context
from .observables import EventSource
from .reconnectingwebsocket import ReconnectingWebsocket
# # 🥭 WebSocketSubscription class
@ -157,14 +158,29 @@ class WebSocketLogSubscription(WebSocketSubscription[LogEvent]):
# `WebSocketSubscription`.
#
class WebSocketSubscriptionManager(Disposable):
def __init__(self, program_name: str):
def __init__(self, context: Context, ping_interval: int = 10):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.program_name: str = program_name
self.context: Context = context
self.ping_interval: int = ping_interval
self.subscriptions: typing.List[WebSocketSubscription] = []
self.ws: typing.Optional[ReconnectingWebsocket] = None
def add(self, subscription: WebSocketSubscription) -> None:
self.subscriptions += [subscription]
def open(self) -> None:
websocket_url = self.context.client.cluster_url.replace("https", "wss", 1)
ws: ReconnectingWebsocket = ReconnectingWebsocket(websocket_url, self.open_handler)
ws.item.subscribe(on_next=self.on_item)
ws.ping_interval = self.ping_interval
self.ws = ws
ws.open()
def close(self) -> None:
if self.ws is not None:
self.ws.close()
self.ws = None
def add_subscription_id(self, id, subscription_id) -> None:
for subscription in self.subscriptions:
if subscription.id == id:
@ -172,13 +188,13 @@ class WebSocketSubscriptionManager(Disposable):
f"Setting ID {subscription_id} on subscription {subscription.id} for {subscription.address}.")
subscription.subscription_id = subscription_id
return
self.logger.error(f"[{self.program_name}] Subscription ID {id} not found")
self.logger.error(f"[{self.context.name}] Subscription ID {id} not found")
def subscription_by_subscription_id(self, subscription_id) -> WebSocketSubscription:
for subscription in self.subscriptions:
if subscription.subscription_id == subscription_id:
return subscription
raise Exception(f"[{self.program_name}] No subscription with subscription ID {subscription_id} could be found.")
raise Exception(f"[{self.context.name}] No subscription with subscription ID {subscription_id} could be found.")
def on_item(self, response) -> None:
if "method" not in response:
@ -191,7 +207,7 @@ class WebSocketSubscriptionManager(Disposable):
built = subscription.build(response["params"])
subscription.publisher.publish(built)
else:
self.logger.error(f"[{self.program_name}] Unknown response: {response}")
self.logger.error(f"[{self.context.name}] Unknown response: {response}")
def open_handler(self, ws: websocket.WebSocketApp):
for subscription in self.subscriptions:
@ -203,3 +219,6 @@ class WebSocketSubscriptionManager(Disposable):
def dispose(self):
for subscription in self.subscriptions:
subscription.dispose()
if self.ws is not None:
self.ws.close()
self.ws = None