terra-oracle/pkg/oracle/oracle.go

235 lines
5.9 KiB
Go

package oracle
import (
"context"
"fmt"
"log"
"time"
"github.com/certusone/terra-oracle/pkg/signer"
ctypes "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth"
tcontext "github.com/cosmos/cosmos-sdk/x/auth/client/txbuilder"
"github.com/google/uuid"
"github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/types"
"github.com/terra-project/core/app"
"github.com/terra-project/core/x/oracle"
)
type PriceOracleConfig struct {
Client *client.HTTP
ValAddress ctypes.ValAddress
ChainID string
TxFee ctypes.Coin
PriceProvider PriceProvider
Signer signer.Signer
}
type PriceOracle struct {
cdc *amino.Codec
client *client.HTTP
valAddress ctypes.ValAddress
chainID string
txFee ctypes.Coin
prevotes []Vote
prevotesPeriod int64
priceProvider PriceProvider
signer signer.Signer
}
func NewPriceOracle(config PriceOracleConfig) *PriceOracle { // wsClient *client.HTTP, valAddress ctypes.ValAddress, chainID string, priceProvider PriceProvider, signer Signer, txFee ctypes.Coin) *PriceOracle {
oracle := &PriceOracle{
cdc: app.MakeCodec(),
client: config.Client,
valAddress: config.ValAddress,
chainID: config.ChainID,
txFee: config.TxFee,
prevotes: []Vote{},
prevotesPeriod: 0,
priceProvider: config.PriceProvider,
signer: config.Signer,
}
return oracle
}
func (priceOracle *PriceOracle) ProcessingLoop() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*500)
defer cancel()
blocks, err := priceOracle.client.Subscribe(ctx, "oracle", types.QueryForEvent(types.EventNewBlock).String())
if err != nil {
return err
}
params, err := priceOracle.QueryOracleParams()
if err != nil {
return err
}
for event := range blocks {
blockData := event.Data.(types.EventDataNewBlock)
log.Printf("new block %d", blockData.Block.Height)
blockHeight := blockData.Block.Height
voteCycle := blockHeight / params.VotePeriod
// if we send reveal messaage, increment offset to send prevotes as next sequence
var offset uint64 = 0
// first submit reveals for previous vote if we have previous vote
if len(priceOracle.prevotes) > 0 && priceOracle.prevotesPeriod < voteCycle {
revealMsgs := priceOracle.RevealVotes()
if len(revealMsgs) > 0 {
log.Printf("revealing votes for period: %d\n", priceOracle.prevotesPeriod)
err := priceOracle.SendMessages(0, revealMsgs)
if err != nil {
log.Printf("error sending tx: %v", err)
} else {
priceOracle.prevotes = []Vote{}
offset = 1
}
}
}
// TODO(roman) if the reveal failed, the prevotes will override existing prevotes
// and we won't reveal - consider changing to keep prevotes until revealed
// or until the period passes and there is no point in revealing
// we have already submitted a prevote for this period
if priceOracle.prevotesPeriod == voteCycle {
continue
}
// if we are within 4 blocks of the end of the cycle, we don't prevote
// this is a "workaround" for "ensuring" our prevotes are included within the proper
// vote period and not the next vote period
cycleLastHeight := (voteCycle * params.VotePeriod) + params.VotePeriod
if blockHeight > (cycleLastHeight - 4) {
continue
}
// generate prevotes
votes, prevoteMsgs := priceOracle.SubmitVotes()
if len(prevoteMsgs) > 0 {
log.Printf("prevote for period: %d\n", voteCycle)
err := priceOracle.SendMessages(offset, prevoteMsgs)
if err != nil {
log.Printf("error sending tx: %v", err)
} else {
priceOracle.prevotes = votes
priceOracle.prevotesPeriod = voteCycle
}
}
}
return nil
}
func (priceOracle *PriceOracle) RevealVotes() []ctypes.Msg {
var msgs []ctypes.Msg
for _, vote := range priceOracle.prevotes {
msg := oracle.NewMsgPriceVote(vote.Price, vote.Salt, vote.Denom, vote.Feeder, priceOracle.valAddress)
msgs = append(msgs, msg)
}
return msgs
}
func (priceOracle *PriceOracle) SubmitVotes() ([]Vote, []ctypes.Msg) {
actives, err := priceOracle.QueryActives()
if err != nil {
panic(err)
}
var (
msgs []ctypes.Msg
votes []Vote
)
for _, denom := range actives {
// truncate leading `u` from the denom for price lookup
truncatedDenom := denom[1:]
price, err := priceOracle.priceProvider.GetPrice(truncatedDenom)
if err != nil {
log.Printf("could not get %s price; err=%v", denom, err)
continue
}
vote := Vote{
Feeder: priceOracle.signer.Address(),
Denom: denom,
Salt: uuid.New().String()[:3],
Price: price,
Validator: priceOracle.valAddress,
}
log.Printf("voting for %s with price %s", denom, price.String())
msg := oracle.NewMsgPricePrevote(vote.Hash(), vote.Denom, vote.Feeder, vote.Validator)
msgs = append(msgs, msg)
votes = append(votes, vote)
}
return votes, msgs
}
func (priceOracle *PriceOracle) SendMessages(sequenceOffset uint64, msgs []ctypes.Msg) error {
tx := auth.StdTx{}
tx.Msgs = msgs
acc, err := priceOracle.QueryAccount(priceOracle.signer.Address())
if err != nil {
return err
}
fee := auth.NewStdFee(50000, ctypes.Coins{priceOracle.txFee})
tx.Fee = fee
signMsg := tcontext.StdSignMsg{
ChainID: priceOracle.chainID,
AccountNumber: acc.GetAccountNumber(),
Sequence: acc.GetSequence() + sequenceOffset,
Memo: "",
Msgs: msgs,
Fee: fee,
}
signBytes := signMsg.Bytes()
signature, err := priceOracle.signer.Sign(signBytes)
if err != nil {
return err
}
tx.Signatures = []auth.StdSignature{
{
Signature: signature,
PubKey: priceOracle.signer.PubKey(),
},
}
txBytes, err := auth.DefaultTxEncoder(priceOracle.cdc)(tx)
if err != nil {
return err
}
res, err := priceOracle.client.BroadcastTxSync(txBytes)
if err != nil {
return err
}
if res.Code != 0 {
return fmt.Errorf("error sending tx: %v", res)
}
log.Printf("submitted tx: %v\n", res)
return nil
}