2021-07-12 02:57:05 -07:00
|
|
|
# # ⚠ Warning
|
|
|
|
#
|
|
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
|
|
|
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|
|
|
# NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
|
|
|
|
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
|
|
|
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
#
|
|
|
|
# [🥭 Mango Markets](https://mango.markets/) support is available at:
|
|
|
|
# [Docs](https://docs.mango.markets/)
|
|
|
|
# [Discord](https://discord.gg/67jySBhxrg)
|
|
|
|
# [Twitter](https://twitter.com/mangomarkets)
|
|
|
|
# [Github](https://github.com/blockworks-foundation)
|
|
|
|
# [Email](mailto:hello@blockworks.foundation)
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
import typing
|
|
|
|
import websocket
|
|
|
|
|
|
|
|
from rx.core.typing import Disposable
|
|
|
|
from solana.publickey import PublicKey
|
|
|
|
from solana.rpc.types import RPCResponse
|
|
|
|
|
|
|
|
from .accountinfo import AccountInfo
|
|
|
|
from .context import Context
|
|
|
|
from .observables import EventSource
|
|
|
|
|
|
|
|
|
|
|
|
# # 🥭 WebSocketSubscription class
|
|
|
|
#
|
|
|
|
# The `WebSocketSubscription` maintains a mapping for an account subscription in a Solana websocket to
|
|
|
|
# an actual instantiated object.
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
|
|
TSubscriptionInstance = typing.TypeVar('TSubscriptionInstance')
|
|
|
|
|
|
|
|
|
|
|
|
class WebSocketSubscription(Disposable, typing.Generic[TSubscriptionInstance]):
|
|
|
|
def __init__(self, context: Context, address: PublicKey, constructor: typing.Callable[[AccountInfo], TSubscriptionInstance]):
|
|
|
|
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
self.address = address
|
|
|
|
self.id = context.random_client_id()
|
|
|
|
self.subscription_id = 0
|
|
|
|
self.from_account_info: typing.Callable[[AccountInfo], TSubscriptionInstance] = constructor
|
|
|
|
self.publisher: EventSource[TSubscriptionInstance] = EventSource[TSubscriptionInstance]()
|
|
|
|
|
|
|
|
def build_request(self) -> str:
|
|
|
|
return """
|
|
|
|
{
|
|
|
|
"jsonrpc": "2.0",
|
|
|
|
"id": \"""" + str(self.id) + """\",
|
|
|
|
"method": "accountSubscribe",
|
|
|
|
"params": [\"""" + str(self.address) + """\",
|
|
|
|
{
|
|
|
|
"encoding": "base64",
|
|
|
|
"commitment": "processed"
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
def build_account_info(self, response: RPCResponse) -> AccountInfo:
|
|
|
|
return AccountInfo.from_response(response, self.address)
|
|
|
|
|
|
|
|
def build(self, response: RPCResponse) -> TSubscriptionInstance:
|
|
|
|
account_info: AccountInfo = self.build_account_info(response)
|
|
|
|
built: TSubscriptionInstance = self.from_account_info(account_info)
|
|
|
|
return built
|
|
|
|
|
|
|
|
def dispose(self):
|
|
|
|
self.publisher.on_completed()
|
|
|
|
self.publisher.dispose()
|
|
|
|
|
|
|
|
|
|
|
|
# # 🥭 WebSocketSubscriptionManager class
|
|
|
|
#
|
|
|
|
# The `WebSocketSubscriptionManager` takes websocket account updates and sends them to the correct
|
|
|
|
# `WebSocketSubscription`.
|
|
|
|
#
|
|
|
|
|
|
|
|
|
|
|
|
class WebSocketSubscriptionManager(Disposable):
|
|
|
|
def __init__(self):
|
|
|
|
self.logger: logging.Logger = logging.getLogger(self.__class__.__name__)
|
|
|
|
self.subscriptions: typing.List[WebSocketSubscription] = []
|
|
|
|
|
|
|
|
def add(self, subscription: WebSocketSubscription) -> None:
|
|
|
|
self.subscriptions += [subscription]
|
|
|
|
|
|
|
|
def add_subscription_id(self, id, subscription_id) -> None:
|
|
|
|
for subscription in self.subscriptions:
|
|
|
|
if subscription.id == id:
|
|
|
|
self.logger.info(
|
2021-07-21 06:31:04 -07:00
|
|
|
f"Setting ID {subscription_id} on subscription {subscription.id} for {subscription.address}.")
|
2021-07-12 02:57:05 -07:00
|
|
|
subscription.subscription_id = subscription_id
|
|
|
|
return
|
|
|
|
self.logger.error(f"Subscription ID {id} not found")
|
|
|
|
|
|
|
|
def subscription_by_subscription_id(self, subscription_id) -> WebSocketSubscription:
|
|
|
|
for subscription in self.subscriptions:
|
|
|
|
if subscription.subscription_id == subscription_id:
|
|
|
|
return subscription
|
|
|
|
raise Exception(f"No subscription with subscription ID {subscription_id} could be found.")
|
|
|
|
|
|
|
|
def on_item(self, response) -> None:
|
|
|
|
if "method" not in response:
|
|
|
|
id: int = int(response["id"])
|
|
|
|
subscription_id: int = int(response["result"])
|
|
|
|
self.add_subscription_id(id, subscription_id)
|
|
|
|
elif response["method"] == "accountNotification":
|
|
|
|
subscription_id = response["params"]["subscription"]
|
|
|
|
subscription = self.subscription_by_subscription_id(subscription_id)
|
|
|
|
built = subscription.build(response["params"])
|
|
|
|
subscription.publisher.publish(built)
|
|
|
|
else:
|
|
|
|
self.logger.error(f"Unknown response: {response}")
|
|
|
|
|
|
|
|
def open_handler(self, ws: websocket.WebSocketApp):
|
|
|
|
for subscription in self.subscriptions:
|
|
|
|
ws.send(subscription.build_request())
|
|
|
|
|
|
|
|
def dispose(self):
|
|
|
|
for subscription in self.subscriptions:
|
|
|
|
subscription.dispose()
|