Integration test for attestations (#413)

* integration test ???

* update

* better logging

* hm

* convert ids

* fix conversion

* what

* speed things up

* handle dynamic symbols properly

* pr comments

Co-authored-by: Jayant Krishnamurthy <jkrishnamurthy@jumptrading.com>
This commit is contained in:
Jayant Krishnamurthy 2022-12-06 09:57:43 -08:00 committed by GitHub
parent 4821b877e3
commit a3199b7d28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 147 additions and 20 deletions

View File

@ -203,6 +203,22 @@ k8s_resource(
trigger_mode = trigger_mode,
)
# attestations checking script
docker_build(
ref = "check-attestations",
context = ".",
only = ["./third_party"],
dockerfile = "./third_party/pyth/Dockerfile.check-attestations",
)
k8s_yaml_with_ns("devnet/check-attestations.yaml")
k8s_resource(
"check-attestations",
resource_deps = ["pyth-price-service", "pyth", "p2w-attest"],
labels = ["pyth"],
trigger_mode = trigger_mode,
)
# Pyth2wormhole relay
docker_build(
ref = "p2w-relay",

View File

@ -0,0 +1,41 @@
---
apiVersion: v1
kind: Service
metadata:
name: check-attestations
labels:
app: check-attestations
spec:
clusterIP: None
selector:
app: check-attestations
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: check-attestations
spec:
selector:
matchLabels:
app: check-attestations
serviceName: check-attestations
replicas: 1
template:
metadata:
labels:
app: check-attestations
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 0
containers:
- name: check-attestations
image: check-attestations
command:
- python3
- /usr/src/pyth/check_attestations.py
tty: true
readinessProbe:
tcpSocket:
port: 2000
periodSeconds: 1
failureThreshold: 300

View File

@ -0,0 +1,11 @@
#syntax=docker/dockerfile:1.2@sha256:e2a8561e419ab1ba6b2fe6cbdf49fd92b95912df1cf7d313c3e2230a333fdbcc
FROM python:3.9-alpine
RUN pip install base58
ADD third_party/pyth/pyth_utils.py /usr/src/pyth/pyth_utils.py
ADD third_party/pyth/check_attestations.py /usr/src/pyth/check_attestations.py
RUN chmod a+rx /usr/src/pyth/*.py
ENV READINESS_PORT=2000

45
third_party/pyth/check_attestations.py vendored Normal file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
# This script is a CI test in tilt that verifies that prices are flowing through the entire system properly.
# It checks that all prices being published by the pyth publisher are showing up at the price service.
import base58
import logging
import time
from pyth_utils import *
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s | %(module)s | %(levelname)s | %(message)s"
)
# Where to read the set of accounts from
PYTH_TEST_ACCOUNTS_HOST = "pyth"
PYTH_TEST_ACCOUNTS_PORT = 4242
PRICE_SERVICE_HOST = "pyth-price-service"
PRICE_SERVICE_PORT = 4200
def base58_to_hex(base58_string):
asc_string = base58.b58decode(base58_string)
return asc_string.hex()
all_prices_attested = False
while not all_prices_attested:
publisher_state_map = get_pyth_accounts(PYTH_TEST_ACCOUNTS_HOST, PYTH_TEST_ACCOUNTS_PORT)
pyth_price_account_ids = sorted([base58_to_hex(x["price"]) for x in publisher_state_map["symbols"]])
price_ids = sorted(get_json(PRICE_SERVICE_HOST, PRICE_SERVICE_PORT, "/api/price_feed_ids"))
if price_ids == pyth_price_account_ids:
if publisher_state_map["all_symbols_added"]:
logging.info("Price ids match and all symbols added. Enabling readiness probe")
all_prices_attested = True
else:
logging.info("Price ids match but still waiting for more symbols to come online.")
else:
logging.info("Price ids do not match")
logging.info(f"published ids: {pyth_price_account_ids}")
logging.info(f"attested ids: {price_ids}")
time.sleep(10)
# Let k8s know the service is up
readiness()

View File

@ -109,20 +109,7 @@ if P2W_INITIALIZE_SOL_CONTRACT is not None:
# Retrieve available symbols from the test pyth publisher if not provided in envs
if P2W_ATTESTATION_CFG is None:
P2W_ATTESTATION_CFG = "./attestation_cfg_test.yaml"
conn = HTTPConnection(PYTH_TEST_ACCOUNTS_HOST, PYTH_TEST_ACCOUNTS_PORT)
conn.request("GET", "/")
res = conn.getresponse()
publisher_state_map = {}
if res.getheader("Content-Type") == "application/json":
publisher_state_map = json.load(res)
else:
logging.error("Bad Content type")
sys.exit(1)
publisher_state_map = get_pyth_accounts(PYTH_TEST_ACCOUNTS_HOST, PYTH_TEST_ACCOUNTS_PORT)
pyth_accounts = publisher_state_map["symbols"]
logging.info(
@ -167,7 +154,7 @@ symbol_groups:
cfg_yaml += f"""
- group_name: longer_interval_sensitive_changes
conditions:
min_interval_secs: 10
min_interval_secs: 3
price_changed_bps: 300
symbols:
"""
@ -186,7 +173,7 @@ symbol_groups:
cfg_yaml += f"""
- group_name: mapping
conditions:
min_interval_secs: 30
min_interval_secs: 10
price_changed_bps: 500
symbols: []
"""

View File

@ -31,7 +31,9 @@ class PythAccEndpoint(BaseHTTPRequestHandler):
self.wfile.flush()
# Test publisher state that gets served via the HTTP endpoint. Note: the schema of this dict is extended here and there
HTTP_ENDPOINT_DATA = {"symbols": [], "mapping_address": None}
# all_symbols_added is set to True once all dynamically-created symbols are added to the on-chain program. This
# flag allows the integration test in check_attestations.py to determine that every on-chain symbol is being attested.
HTTP_ENDPOINT_DATA = {"symbols": [], "mapping_address": None, "all_symbols_added": False}
def publisher_random_update(price_pubkey):
@ -154,8 +156,8 @@ with ThreadPoolExecutor() as executor: # Used for async adding of products and p
# Add a symbol if new symbol interval configured. This will add a new symbol if PYTH_NEW_SYMBOL_INTERVAL_SECS
# is passed since adding the previous symbol. The second constraint ensures that
# at most PYTH_TEST_SYMBOL_COUNT new price symbols are created.
if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0 and dynamically_added_symbols < PYTH_TEST_SYMBOL_COUNT:
# at most PYTH_DYNAMIC_SYMBOL_COUNT new price symbols are created.
if PYTH_NEW_SYMBOL_INTERVAL_SECS > 0 and dynamically_added_symbols < PYTH_DYNAMIC_SYMBOL_COUNT:
# Do it if enough time passed
now = time.monotonic()
if (now - last_new_sym_added_at) >= PYTH_NEW_SYMBOL_INTERVAL_SECS:
@ -164,6 +166,9 @@ with ThreadPoolExecutor() as executor: # Used for async adding of products and p
next_new_symbol_id += 1
dynamically_added_symbols += 1
if dynamically_added_symbols >= PYTH_DYNAMIC_SYMBOL_COUNT:
HTTP_ENDPOINT_DATA["all_symbols_added"] = True
time.sleep(PYTH_PUBLISHER_INTERVAL_SECS)
sys.stdout.flush()

View File

@ -1,7 +1,10 @@
import logging
import os
import json
import socketserver
import subprocess
import sys
from http.client import HTTPConnection
# Settings specific to local devnet Pyth instance
PYTH = os.environ.get("PYTH", "./pyth")
@ -17,6 +20,7 @@ PYTH_PUBLISHER_KEYPAIR = os.environ.get(
# How long to sleep between mock Pyth price updates
PYTH_PUBLISHER_INTERVAL_SECS = float(os.environ.get("PYTH_PUBLISHER_INTERVAL_SECS", "5"))
PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "11"))
PYTH_DYNAMIC_SYMBOL_COUNT = int(os.environ.get("PYTH_DYNAMIC_SYMBOL_COUNT", "3"))
# If above 0, adds a new test symbol periodically, waiting at least
# the given number of seconds in between
@ -24,7 +28,7 @@ PYTH_TEST_SYMBOL_COUNT = int(os.environ.get("PYTH_TEST_SYMBOL_COUNT", "11"))
# NOTE: the new symbols are added in the HTTP endpoint used by the
# p2w-attest service in Tilt. You may need to wait to see p2w-attest
# pick up brand new symbols
PYTH_NEW_SYMBOL_INTERVAL_SECS = int(os.environ.get("PYTH_NEW_SYMBOL_INTERVAL_SECS", "120"))
PYTH_NEW_SYMBOL_INTERVAL_SECS = int(os.environ.get("PYTH_NEW_SYMBOL_INTERVAL_SECS", "30"))
PYTH_MAPPING_KEYPAIR = os.environ.get(
"PYTH_MAPPING_KEYPAIR", f"{PYTH_KEY_STORE}/mapping_key_pair.json"
@ -108,6 +112,24 @@ def sol_run_or_die(subcommand, args=[], **kwargs):
return run_or_die(["solana", subcommand] + args + ["--url", SOL_RPC_URL], **kwargs)
def get_json(host, port, path):
conn = HTTPConnection(host, port)
conn.request("GET", path)
res = conn.getresponse()
# starstwith because the header value may include optional fields after (like charset)
if res.getheader("Content-Type").startswith("application/json"):
return json.load(res)
else:
logging.error(f"Error getting {host}:{port}{path} : Content-Type was not application/json")
logging.error(f"HTTP response code: {res.getcode()}")
logging.error(f"HTTP headers: {res.getheaders()}")
logging.error(f"Message: {res.msg}")
sys.exit(1)
def get_pyth_accounts(host, port):
return get_json(host, port, "/")
class ReadinessTCPHandler(socketserver.StreamRequestHandler):
def handle(self):
"""TCP black hole"""