pyth-crosschain/third_party/pyth/pyth_publisher.py

183 lines
5.7 KiB
Python

#!/usr/bin/env python3
from pyth_utils import *
from http.server import HTTPServer, BaseHTTPRequestHandler
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
import os
import random
import sys
import threading
import time
# The mock publisher needs to fund the publisher identity account,
# unable to use a separate payer
SOL_AIRDROP_AMT = int(os.environ.get("SOL_AIRDROP_AMT", 0))
class PythAccEndpoint(BaseHTTPRequestHandler):
"""
A dumb endpoint to respond with a JSON containing Pyth symbol and mapping addresses
"""
def do_GET(self):
print(f"Got path {self.path}")
sys.stdout.flush()
data = json.dumps(HTTP_ENDPOINT_DATA).encode("utf-8")
print(f"Sending:\n{data}")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
self.wfile.flush()
# Test publisher state that gets served via the HTTP endpoint. Note: the schema of this dict is extended here and there
# all_symbols_added is set to True once all dynamically-created symbols are added to the on-chain program. This
# flag allows the integration test in check_attestations.py to determine that every on-chain symbol is being attested.
HTTP_ENDPOINT_DATA = {"symbols": [], "mapping_address": None, "all_symbols_added": False}
def publisher_random_update(price_pubkey):
"""
Update the specified price with random values
"""
value = random.randrange(1000, 2000)
confidence = random.randrange(1, 10)
pyth_run_or_die("upd_price_val", args=[
price_pubkey, str(value), str(confidence), "trading"
])
print(f"Price {price_pubkey} value updated to {str(value)}!")
def accounts_endpoint():
"""
Run a barebones HTTP server to share the dynamic Pyth
mapping/product/price account addresses
"""
server_address = ('', 4242)
httpd = HTTPServer(server_address, PythAccEndpoint)
httpd.serve_forever()
def add_symbol(num: int):
"""
NOTE: Updates HTTP_ENDPOINT_DATA
"""
symbol_name = f"Test symbol {num}"
# Add a product
prod_pubkey = pyth_admin_run_or_die(
"add_product", capture_output=True).stdout.strip()
print(f"{symbol_name}: Added product {prod_pubkey}")
# Add a price
price_pubkey = pyth_admin_run_or_die(
"add_price",
args=[prod_pubkey, "price"],
capture_output=True
).stdout.strip()
print(f"{symbol_name}: Added price {price_pubkey}")
# Become a publisher for the new price
pyth_admin_run_or_die(
"add_publisher", args=[publisher_pubkey, price_pubkey],
debug=True,
capture_output=True)
print(f"{symbol_name}: Added publisher {publisher_pubkey}")
# Update the prices as the newly added publisher
publisher_random_update(price_pubkey)
sym = {
"name": symbol_name,
"product": prod_pubkey,
"price": price_pubkey
}
HTTP_ENDPOINT_DATA["symbols"].append(sym)
sys.stdout.flush()
print(f"New symbol: {num}")
return num
# Fund the publisher
sol_run_or_die("airdrop", [
str(SOL_AIRDROP_AMT),
"--keypair", PYTH_PUBLISHER_KEYPAIR,
"--commitment", "finalized",
])
# Create a mapping
pyth_admin_run_or_die("init_mapping", capture_output=True)
mapping_addr = sol_run_or_die("address", args=[
"--keypair", PYTH_MAPPING_KEYPAIR
], capture_output=True).stdout.strip()
HTTP_ENDPOINT_DATA["mapping_addr"] = mapping_addr
print(f"New mapping at {mapping_addr}")
print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
publisher_pubkey = sol_run_or_die("address", args=[
"--keypair", PYTH_PUBLISHER_KEYPAIR
], capture_output=True).stdout.strip()
with ThreadPoolExecutor(max_workers=PYTH_TEST_SYMBOL_COUNT) as executor:
add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
for future in as_completed(add_symbol_futures):
print(f"Completed {future.result()}")
print(
f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL_SECS)} seconds")
# Spin off the readiness probe endpoint into a separate thread
readiness_thread = threading.Thread(target=readiness, daemon=True)
# Start an HTTP endpoint for looking up test product/price addresses
http_service = threading.Thread(target=accounts_endpoint, daemon=True)
readiness_thread.start()
http_service.start()
next_new_symbol_id = PYTH_TEST_SYMBOL_COUNT
last_new_sym_added_at = time.monotonic()
with ThreadPoolExecutor() as executor: # Used for async adding of products and prices
dynamically_added_symbols = 0
while True:
for sym in HTTP_ENDPOINT_DATA["symbols"]:
publisher_random_update(sym["price"])
# Add a symbol if new symbol interval configured. This will add a new symbol if PYTH_NEW_SYMBOL_INTERVAL_SECS
# is passed since adding the previous symbol. The second constraint ensures that
# at most PYTH_DYNAMIC_SYMBOL_COUNT new price symbols are created.
if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0 and dynamically_added_symbols < PYTH_DYNAMIC_SYMBOL_COUNT:
# Do it if enough time passed
now = time.monotonic()
if (now - last_new_sym_added_at) >= PYTH_NEW_SYMBOL_INTERVAL_SECS:
executor.submit(add_symbol, next_new_symbol_id) # Returns immediately, runs in background
last_sym_added_at = now
next_new_symbol_id += 1
dynamically_added_symbols += 1
if dynamically_added_symbols >= PYTH_DYNAMIC_SYMBOL_COUNT:
HTTP_ENDPOINT_DATA["all_symbols_added"] = True
time.sleep(PYTH_PUBLISHER_INTERVAL_SECS)
sys.stdout.flush()
readiness_thread.join()
http_service.join()