import os import socketserver import sys import subprocess PYTH = os.environ.get("PYTH", "./pyth") PYTH_KEY_STORE = os.environ.get("PYTH_KEY_STORE", "/home/pyth/.pythd") PYTH_PROGRAM_KEYPAIR = os.environ.get( "PYTH_PROGRAM_KEYPAIR", f"{PYTH_KEY_STORE}/publish_key_pair.json") PYTH_PROGRAM_SO_PATH = os.environ.get("PYTH_PROGRAM_SO", "../target/oracle.so") PYTH_PUBLISHER_KEYPAIR = os.environ.get( "PYTH_PUBLISHER_KEYPAIR", f"{PYTH_KEY_STORE}/publish_key_pair.json") PYTH_PUBLISHER_INTERVAL = float(os.environ.get("PYTH_PUBLISHER_INTERVAL", "5")) SOL_AIRDROP_AMT = 100 SOL_RPC_HOST = "solana-devnet" SOL_RPC_PORT = 8899 SOL_RPC_URL = f"http://{SOL_RPC_HOST}:{str(SOL_RPC_PORT)}" READINESS_PORT = int(os.environ.get("READINESS_PORT", "2000")) def run_or_die(args, die=True, **kwargs): """ Opinionated subprocess.run() call with fancy logging """ args_readable = ' '.join(args) print(f"CMD RUN\t{args_readable}", file=sys.stderr) sys.stderr.flush() ret = subprocess.run(args, text=True, **kwargs) if ret.returncode is not 0: print(f"CMD FAIL {ret.returncode}\t{args_readable}", file=sys.stderr) out = ret.stdout if ret.stdout is not None else "" err = ret.stderr if ret.stderr is not None else "" print(f"CMD STDOUT\n{out}", file=sys.stderr) print(f"CMD STDERR\n{err}", file=sys.stderr) if die: sys.exit(ret.returncode) else: print(f"CMD DIE FALSE", file=sys.stderr) else: print(f"CMD OK\t{args_readable}", file=sys.stderr) sys.stderr.flush() return ret def pyth_run_or_die(subcommand, args=[], debug=False, confirm=True, **kwargs): """ Pyth boilerplate in front of run_or_die """ return run_or_die( [PYTH, subcommand] + args + (["-d"] if debug else []) # Note: not all pyth subcommands accept -n + ([] if confirm else ["-n"]) + ["-k", PYTH_KEY_STORE] + ["-r", SOL_RPC_HOST] + ["-c", "finalized"], **kwargs) def sol_run_or_die(subcommand, args=[], **kwargs): """ Solana boilerplate in front of run_or_die """ return run_or_die(["solana", subcommand] + args + ["--url", SOL_RPC_URL], **kwargs) class ReadinessTCPHandler(socketserver.StreamRequestHandler): def handle(self): """TCP black hole""" self.rfile.read(64) def readiness(): """ Accept connections from readiness probe """ with socketserver.TCPServer(("0.0.0.0", READINESS_PORT), ReadinessTCPHandler) as srv: srv.serve_forever() # run_or_die(["nc", "-k", "-l", "-p", READINESS_PORT])