mango-explorer/mango/websocketsubscription.py

300 lines
11 KiB
Python
Raw Normal View History

# # ⚠ Warning
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# [🥭 Mango Markets](https://mango.markets/) support is available at:
# [Docs](https://docs.mango.markets/)
# [Discord](https://discord.gg/67jySBhxrg)
# [Twitter](https://twitter.com/mangomarkets)
# [Github](https://github.com/blockworks-foundation)
# [Email](mailto:hello@blockworks.foundation)
import abc
import logging
import typing
import websocket
from datetime import datetime
from rx.subject import BehaviorSubject
from rx.core.typing import Disposable
from solana.publickey import PublicKey
from solana.rpc.types import RPCResponse
from .accountinfo import AccountInfo
from .context import Context
from .observables import EventSource
from .reconnectingwebsocket import ReconnectingWebsocket
# # 🥭 WebSocketSubscription class
#
# The `WebSocketSubscription` maintains a mapping for an account subscription in a Solana websocket to
# an actual instantiated object.
#
TSubscriptionInstance = typing.TypeVar('TSubscriptionInstance')
class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance], metaclass=abc.ABCMeta):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.context: Context = context
self.address: PublicKey = address
self.id: int = context.random_client_id()
self.subscription_id: int = 0
self.from_account_info: typing.Callable[[AccountInfo], TSubscriptionInstance] = constructor
self.publisher: EventSource[TSubscriptionInstance] = EventSource[TSubscriptionInstance]()
self.ws: typing.Optional[ReconnectingWebsocket] = None
self.pong: BehaviorSubject = BehaviorSubject(datetime.now())
self.pong_subscription: typing.Optional[Disposable] = None
@abc.abstractmethod
def build_request(self) -> str:
raise NotImplementedError("WebSocketSubscription.build_request() is not implemented on the base type.")
def open(self,) -> None:
websocket_url = self.context.client.cluster_url.replace("https", "wss", 1)
ws: ReconnectingWebsocket = ReconnectingWebsocket(websocket_url, lambda sock: sock.send(self.build_request()))
ws.item.subscribe(on_next=self._on_item)
ws.ping_interval = self.context.ping_interval
self.ws = ws
ws.open()
self.pong_subscription = ws.pong.subscribe(self.pong)
def close(self) -> None:
if self.ws is not None:
if self.pong_subscription is not None:
self.pong_subscription.dispose()
self.pong_subscription = None
self.ws.close()
self.ws = None
def _on_item(self, response) -> None:
if "method" not in response:
id: int = int(response["id"])
if id == self.id:
subscription_id: int = int(response["result"])
self.logger.info(f"Subscription created with id {subscription_id}.")
elif (response["method"] == "accountNotification") or (response["method"] == "programNotification") or (response["method"] == "logsNotification"):
subscription_id = response["params"]["subscription"]
built = self.build_subscribed_instance(response["params"])
self.publisher.publish(built)
else:
self.logger.error(f"[{self.context.name}] Unknown response: {response}")
def build_subscribed_instance(self, response: RPCResponse) -> TSubscriptionInstance:
account_info: AccountInfo = AccountInfo.from_response(response, self.address)
built: TSubscriptionInstance = self.from_account_info(account_info)
return built
def dispose(self):
self.publisher.on_completed()
self.publisher.dispose()
if self.ws is not None:
if self.pong_subscription is not None:
self.pong_subscription.dispose()
self.pong_subscription = None
self.ws.close()
self.ws = None
class WebSocketProgramSubscription(WebSocketSubscription[TSubscriptionInstance]):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
super().__init__(context, address, constructor)
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "programSubscribe",
"params": [\"""" + str(self.address) + """\",
{
"encoding": "base64",
"commitment": \"""" + str(self.context.client.commitment) + """\"
}
]
}
"""
class WebSocketAccountSubscription(WebSocketSubscription[TSubscriptionInstance]):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
super().__init__(context, address, constructor)
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "accountSubscribe",
"params": [\"""" + str(self.address) + """\",
{
"encoding": "base64",
"commitment": \"""" + str(self.context.client.commitment) + """\"
}
]
}
"""
class LogEvent:
def __init__(self, signatures: typing.Sequence[str], logs: typing.Sequence[str]):
self.signatures: typing.Sequence[str] = signatures
self.logs: typing.Sequence[str] = logs
@staticmethod
def from_response(response) -> "LogEvent":
signature_text: str = response["result"]["value"]["signature"]
signatures = signature_text.split(",")
logs = response["result"]["value"]["logs"]
return LogEvent(signatures, logs)
2021-08-01 10:03:46 -07:00
def __str__(self) -> str:
logs = "\n ".join(self.logs)
return f"""« 𝙻𝚘𝚐𝙴𝚟𝚎𝚗𝚝 {self.signatures}
{logs}
»"""
def __repr__(self) -> str:
return f"{self}"
class WebSocketLogSubscription(WebSocketSubscription[LogEvent]):
def __init__(self, context: Context, address: PublicKey):
super().__init__(context, address, lambda _: LogEvent([""], []))
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "logsSubscribe",
"params": [
{
"mentions": [ \"""" + str(self.address) + """\" ]
},
{
"commitment": \"""" + str(self.context.client.commitment) + """\"
}
]
}
"""
def build(self, response: RPCResponse) -> LogEvent:
return LogEvent.from_response(response)
# # 🥭 WebSocketSubscriptionManager class
#
# The `WebSocketSubscriptionManager` is a base class for different websocket management approaches.
#
class WebSocketSubscriptionManager(Disposable, metaclass=abc.ABCMeta):
def __init__(self, context: Context, ping_interval: int = 10):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.context: Context = context
self.ping_interval: int = ping_interval
self.subscriptions: typing.List[WebSocketSubscription] = []
def add(self, subscription: WebSocketSubscription) -> None:
self.subscriptions += [subscription]
def open(self) -> None:
raise NotImplementedError("WebSocketSubscription.build_request() is not implemented on the base type.")
def close(self) -> None:
raise NotImplementedError("WebSocketSubscription.build_request() is not implemented on the base type.")
def on_disconnected(self, ws: websocket.WebSocketApp):
for subscription in self.subscriptions:
subscription.dispose()
self.subscriptions = []
def dispose(self):
for subscription in self.subscriptions:
subscription.dispose()
# # 🥭 IndividualWebSocketSubscriptionManager class
#
# The `IndividualWebSocketSubscriptionManager` runs `WebSocketSubscription`s each in their own websocket.
#
class IndividualWebSocketSubscriptionManager(WebSocketSubscriptionManager):
def __init__(self, context: Context, ping_interval: int = 10):
super().__init__(context, ping_interval)
def open(self) -> None:
for subscription in self.subscriptions:
subscription.open()
def close(self) -> None:
for subscription in self.subscriptions:
subscription.close()
# # 🥭 SharedWebSocketSubscriptionManager class
#
# The `SharedWebSocketSubscriptionManager` runs a single websocket and sends updates to the correct
# `WebSocketSubscription`.
#
class SharedWebSocketSubscriptionManager(WebSocketSubscriptionManager):
def __init__(self, context: Context, ping_interval: int = 10):
super().__init__(context, ping_interval)
self.ws: typing.Optional[ReconnectingWebsocket] = None
def open(self) -> None:
websocket_url = self.context.client.cluster_url.replace("https", "wss", 1)
ws: ReconnectingWebsocket = ReconnectingWebsocket(websocket_url, self.open_handler)
ws.item.subscribe(on_next=self.on_item)
ws.ping_interval = self.ping_interval
self.ws = ws
ws.open()
def close(self) -> None:
if self.ws is not None:
self.ws.close()
self.ws = None
def add_subscription_id(self, id, subscription_id) -> None:
for subscription in self.subscriptions:
if subscription.id == id:
self.logger.info(
f"Setting ID {subscription_id} on subscription {subscription.id} for {subscription.address}.")
subscription.subscription_id = subscription_id
return
self.logger.error(f"[{self.context.name}] Subscription ID {id} not found")
def subscription_by_subscription_id(self, subscription_id) -> WebSocketSubscription:
for subscription in self.subscriptions:
if subscription.subscription_id == subscription_id:
return subscription
raise Exception(f"[{self.context.name}] No subscription with subscription ID {subscription_id} could be found.")
def on_item(self, response) -> None:
if "method" not in response:
id: int = int(response["id"])
subscription_id: int = int(response["result"])
self.add_subscription_id(id, subscription_id)
elif (response["method"] == "accountNotification") or (response["method"] == "programNotification") or (response["method"] == "logsNotification"):
subscription_id = response["params"]["subscription"]
subscription = self.subscription_by_subscription_id(subscription_id)
built = subscription.build_subscribed_instance(response["params"])
subscription.publisher.publish(built)
else:
self.logger.error(f"[{self.context.name}] Unknown response: {response}")
def open_handler(self, ws: websocket.WebSocketApp):
for subscription in self.subscriptions:
ws.send(subscription.build_request())
def dispose(self):
super().dispose()
if self.ws is not None:
self.ws.close()
self.ws = None