mango-explorer/mango/websocketsubscription.py

127 lines
4.8 KiB
Python

# # ⚠ Warning
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# [🥭 Mango Markets](https://mango.markets/) support is available at:
# [Docs](https://docs.mango.markets/)
# [Discord](https://discord.gg/67jySBhxrg)
# [Twitter](https://twitter.com/mangomarkets)
# [Github](https://github.com/blockworks-foundation)
# [Email](mailto:hello@blockworks.foundation)
import logging
import typing
import websocket
from rx.core.typing import Disposable
from solana.publickey import PublicKey
from solana.rpc.types import RPCResponse
from .accountinfo import AccountInfo
from .context import Context
from .observables import EventSource
# # 🥭 WebSocketSubscription class
#
# The `WebSocketSubscription` maintains a mapping for an account subscription in a Solana websocket to
# an actual instantiated object.
#
TSubscriptionInstance = typing.TypeVar('TSubscriptionInstance')
class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance]):
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.address = address
self.id = context.random_client_id()
self.subscription_id = 0
self.from_account_info: typing.Callable[[AccountInfo], TSubscriptionInstance] = constructor
self.publisher: EventSource[TSubscriptionInstance] = EventSource[TSubscriptionInstance]()
def build_request(self) -> str:
return """
{
"jsonrpc": "2.0",
"id": \"""" + str(self.id) + """\",
"method": "accountSubscribe",
"params": [\"""" + str(self.address) + """\",
{
"encoding": "base64",
"commitment": "processed"
}
]
}
"""
def build_account_info(self, response: RPCResponse) -> AccountInfo:
return AccountInfo.from_response(response, self.address)
def build(self, response: RPCResponse) -> TSubscriptionInstance:
account_info: AccountInfo = self.build_account_info(response)
built: TSubscriptionInstance = self.from_account_info(account_info)
return built
def dispose(self):
self.publisher.on_completed()
self.publisher.dispose()
# # 🥭 WebSocketSubscriptionManager class
#
# The `WebSocketSubscriptionManager` takes websocket account updates and sends them to the correct
# `WebSocketSubscription`.
#
class WebSocketSubscriptionManager(Disposable):
def __init__(self):
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
self.subscriptions: typing.List[WebSocketSubscription] = []
def add(self, subscription: WebSocketSubscription) -> None:
self.subscriptions += [subscription]
def add_subscription_id(self, id, subscription_id) -> None:
for subscription in self.subscriptions:
if subscription.id == id:
self.logger.info(
f"Setting ID {subscription_id} on subscription {subscription.address}/{subscription.id}.")
subscription.subscription_id = subscription_id
return
self.logger.error(f"Subscription ID {id} not found")
def subscription_by_subscription_id(self, subscription_id) -> WebSocketSubscription:
for subscription in self.subscriptions:
if subscription.subscription_id == subscription_id:
return subscription
raise Exception(f"No subscription with subscription ID {subscription_id} could be found.")
def on_item(self, response) -> None:
if "method" not in response:
id: int = int(response["id"])
subscription_id: int = int(response["result"])
self.add_subscription_id(id, subscription_id)
elif response["method"] == "accountNotification":
subscription_id = response["params"]["subscription"]
subscription = self.subscription_by_subscription_id(subscription_id)
built = subscription.build(response["params"])
subscription.publisher.publish(built)
else:
self.logger.error(f"Unknown response: {response}")
def open_handler(self, ws: websocket.WebSocketApp):
for subscription in self.subscriptions:
ws.send(subscription.build_request())
def dispose(self):
for subscription in self.subscriptions:
subscription.dispose()