Node: Remove cutover checks (#4149)

This commit is contained in:
bruce-riley 2024-10-30 13:16:22 -05:00 committed by GitHub
parent c35940ae96
commit 02f468f776
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 41 additions and 589 deletions

View File

@ -1,87 +0,0 @@
package p2p
import (
"fmt"
"strings"
"sync/atomic"
"time"
"go.uber.org/zap"
)
// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr!
const mainnetCutOverTimeStr = "2024-10-29T09:00:00-0500"
const testnetCutOverTimeStr = "2024-09-24T09:00:00-0500"
const devnetCutOverTimeStr = "2024-08-01T00:00:00-0500"
const cutOverFmtStr = "2006-01-02T15:04:05-0700"
// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics.
var gossipCutoverCompleteFlag atomic.Bool
// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic.
func GossipCutoverComplete() bool {
return gossipCutoverCompleteFlag.Load()
}
// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has
// not yet passed, it creates a go routine to wait for that time and then set the flag.
func evaluateGossipCutOver(logger *zap.Logger, networkID string) error {
cutOverTimeStr := getCutOverTimeStr(networkID)
sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now())
if err != nil {
return err
}
gossipCutoverCompleteFlag.Store(sco)
logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))
if delay != time.Duration(0) {
// Wait for the cut over time and then update the flag.
go func() {
time.Sleep(delay)
logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco"))
gossipCutoverCompleteFlag.Store(true)
}()
}
return nil
}
// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes.
func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) {
if cutOverTimeStr == "" {
return false, 0, nil
}
cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr)
if err != nil {
return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err)
}
if cutOverTime.Before(now) {
logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco"))
return true, 0, nil
}
// If we get here, we need to wait for the cutover and then switch the global flag.
delay := cutOverTime.Sub(now)
logger.Info("still waiting for cut over time",
zap.Stringer("cutOverTime", cutOverTime),
zap.String("now", now.Format(cutOverFmtStr)),
zap.Stringer("delay", delay),
zap.String("component", "p2pco"))
return false, delay, nil
}
// getCutOverTimeStr returns the cut over time string based on the network ID passed in.
func getCutOverTimeStr(networkID string) string { //nolint:unparam
if strings.Contains(networkID, "/mainnet/") {
return mainnetCutOverTimeStr
}
if strings.Contains(networkID, "/testnet/") {
return testnetCutOverTimeStr
}
return devnetCutOverTimeStr
}

View File

@ -1,81 +0,0 @@
package p2p
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestVerifyCutOverTime(t *testing.T) {
if mainnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr)
require.NoError(t, err)
}
if testnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr)
require.NoError(t, err)
}
if devnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr)
require.NoError(t, err)
}
}
func TestGetCutOverTimeStr(t *testing.T) {
assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah"))
assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah"))
assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah"))
}
func TestCutOverDisabled(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := ""
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)
cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}
func TestCutOverInvalidTime(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := "Hello World"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)
_, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`)
}
func TestCutOverAlreadyHappened(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)
cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.True(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}
func TestCutOverDelayRequired(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000")
require.NoError(t, err)
cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(60*time.Minute), delay)
}

View File

@ -311,16 +311,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
logger := supervisor.Logger(ctx) logger := supervisor.Logger(ctx)
// Evaluate the gossip cutover time. If it has passed, then the flag will be set to make us publish on the new topics.
// If not, a routine will be started to wait for that time before starting to publish on the new topics.
cutoverErr := evaluateGossipCutOver(logger, params.networkID)
if cutoverErr != nil {
panic(cutoverErr)
}
// If the cutover has not happened yet, we need to join and subscribe to the VAA topic because it is also the old topic.
needOldTopic := !GossipCutoverComplete()
defer func() { defer func() {
// TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted. // TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted.
// But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background. // But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background.
@ -422,7 +412,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
} }
// Set up the VAA channel. //////////////////////////////////////////////////////////////////// // Set up the VAA channel. ////////////////////////////////////////////////////////////////////
if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil || needOldTopic { if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil {
vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast")
logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) logger.Info("joining the vaa topic", zap.String("topic", vaaTopic))
vaaPubsubTopic, err = ps.Join(vaaTopic) vaaPubsubTopic, err = ps.Join(vaaTopic)
@ -436,7 +426,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
} }
}() }()
if params.signedIncomingVaaRecvC != nil || needOldTopic { if params.signedIncomingVaaRecvC != nil {
logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic))
vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil { if err != nil {
@ -528,9 +518,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
} }
features := make([]string, 0) features := make([]string, 0)
if GossipCutoverComplete() {
features = append(features, "p2p:new_gossip")
}
if params.processorFeaturesFunc != nil { if params.processorFeaturesFunc != nil {
flag := params.processorFeaturesFunc() flag := params.processorFeaturesFunc()
if flag != "" { if flag != "" {
@ -597,21 +584,13 @@ func Run(params *RunParams) func(ctx context.Context) error {
return b return b
}() }()
if GossipCutoverComplete() { if controlPubsubTopic == nil {
if controlPubsubTopic == nil { panic("controlPubsubTopic should not be nil when nodeName is set")
panic("controlPubsubTopic should not be nil when nodeName is set") }
} err = controlPubsubTopic.Publish(ctx, b)
err = controlPubsubTopic.Publish(ctx, b) p2pMessagesSent.WithLabelValues("control").Inc()
p2pMessagesSent.WithLabelValues("control").Inc() if err != nil {
if err != nil { logger.Warn("failed to publish heartbeat message", zap.Error(err))
logger.Warn("failed to publish heartbeat message", zap.Error(err))
}
} else if vaaPubsubTopic != nil {
err = vaaPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Warn("failed to publish heartbeat message to old topic", zap.Error(err))
}
} }
p2pHeartbeatsSent.Inc() p2pHeartbeatsSent.Inc()
@ -629,38 +608,22 @@ func Run(params *RunParams) func(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return return
case msg := <-params.gossipControlSendC: case msg := <-params.gossipControlSendC:
if GossipCutoverComplete() { if controlPubsubTopic == nil {
if controlPubsubTopic == nil { panic("controlPubsubTopic should not be nil when gossipControlSendC is set")
panic("controlPubsubTopic should not be nil when gossipControlSendC is set") }
} err := controlPubsubTopic.Publish(ctx, msg)
err := controlPubsubTopic.Publish(ctx, msg) p2pMessagesSent.WithLabelValues("control").Inc()
p2pMessagesSent.WithLabelValues("control").Inc() if err != nil {
if err != nil { logger.Error("failed to publish message from control queue", zap.Error(err))
logger.Error("failed to publish message from control queue", zap.Error(err))
}
} else if vaaPubsubTopic != nil {
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Error("failed to publish message from control queue to old topic", zap.Error(err))
}
} }
case msg := <-params.gossipAttestationSendC: case msg := <-params.gossipAttestationSendC:
if GossipCutoverComplete() { if attestationPubsubTopic == nil {
if attestationPubsubTopic == nil { panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set")
panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set") }
} err := attestationPubsubTopic.Publish(ctx, msg)
err := attestationPubsubTopic.Publish(ctx, msg) p2pMessagesSent.WithLabelValues("attestation").Inc()
p2pMessagesSent.WithLabelValues("attestation").Inc() if err != nil {
if err != nil { logger.Error("failed to publish message from attestation queue", zap.Error(err))
logger.Error("failed to publish message from attestation queue", zap.Error(err))
}
} else if vaaPubsubTopic != nil {
err := vaaPubsubTopic.Publish(ctx, msg)
p2pMessagesSent.WithLabelValues("old_attestation").Inc()
if err != nil {
logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err))
}
} }
case msg := <-params.gossipVaaSendC: case msg := <-params.gossipVaaSendC:
if vaaPubsubTopic == nil { if vaaPubsubTopic == nil {
@ -704,23 +667,15 @@ func Run(params *RunParams) func(ctx context.Context) error {
params.obsvReqRecvC <- msg params.obsvReqRecvC <- msg
} }
if GossipCutoverComplete() { if controlPubsubTopic == nil {
if controlPubsubTopic == nil { panic("controlPubsubTopic should not be nil when obsvReqSendC is set")
panic("controlPubsubTopic should not be nil when obsvReqSendC is set") }
} err = controlPubsubTopic.Publish(ctx, b)
err = controlPubsubTopic.Publish(ctx, b) p2pMessagesSent.WithLabelValues("control").Inc()
p2pMessagesSent.WithLabelValues("control").Inc() if err != nil {
if err != nil { logger.Error("failed to publish observation request", zap.Error(err))
logger.Error("failed to publish observation request", zap.Error(err)) } else {
} else { logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
}
} else if vaaPubsubTopic != nil {
err = vaaPubsubTopic.Publish(ctx, b)
p2pMessagesSent.WithLabelValues("old_control").Inc()
if err != nil {
logger.Error("failed to publish observation request to old topic", zap.Error(err))
}
} }
} }
} }
@ -991,97 +946,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
} }
switch m := msg.Message.(type) { switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedHeartbeat: // TODO: Get rid of this after the cutover.
s := m.SignedHeartbeat
gs := params.gst.Get()
if gs == nil {
// No valid guardian set yet - dropping heartbeat
if logger.Level().Enabled(params.components.SignedHeartbeatLogLevel) {
logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
}
break
}
if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
if logger.Level().Enabled(params.components.SignedHeartbeatLogLevel) {
logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
}
} else {
p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
if logger.Level().Enabled(params.components.SignedHeartbeatLogLevel) {
logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received",
zap.Any("value", heartbeat),
zap.String("from", envelope.GetFrom().String()))
}
func() {
if len(heartbeat.P2PNodeId) != 0 {
params.components.ProtectedHostByGuardianKeyLock.Lock()
defer params.components.ProtectedHostByGuardianKeyLock.Unlock()
var peerId peer.ID
if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil {
logger.Error("p2p_node_id_in_heartbeat_invalid",
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
guardianAddr := eth_common.BytesToAddress(s.GuardianAddr)
if params.guardianSigner == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.guardianSigner.PublicKey()) {
prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr]
if ok {
if prevPeerId != peerId {
logger.Info("p2p_guardian_peer_changed",
zap.String("guardian_addr", guardianAddr.String()),
zap.String("prevPeerId", prevPeerId.String()),
zap.String("newPeerId", peerId.String()),
)
params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
params.components.ConnMgr.Protect(peerId, "heartbeat")
params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
}
} else {
params.components.ConnMgr.Protect(peerId, "heartbeat")
params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId
}
}
}
} else {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName))
}
}
}()
}
case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover.
if params.obsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch: // TODO: Get rid of this after the cutover.
if params.batchObsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservationBatch, params.batchObsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because obsvRecvC is full")
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum: case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if params.signedIncomingVaaRecvC != nil { if params.signedIncomingVaaRecvC != nil {
select { select {
@ -1098,48 +962,6 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc()
} }
} }
case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover.
if params.obsvReqRecvC != nil {
s := m.SignedObservationRequest
gs := params.gst.Get()
if gs == nil {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String()))
}
break
}
r, err := processSignedObservationRequest(s, gs)
if err != nil {
p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("invalid signed observation request received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
}
} else {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String()))
}
select {
case params.obsvReqRecvC <- r:
p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
}
}
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover.
if params.signedGovCfgRecvC != nil {
params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
}
case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover.
if params.signedGovStatusRecvC != nil {
params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
}
default: default:
p2pMessagesReceived.WithLabelValues("unknown").Inc() p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type on vaa topic (running outdated software?)", logger.Warn("received unknown message type on vaa topic (running outdated software?)",

View File

@ -44,7 +44,7 @@ func BenchmarkHandleObservation(b *testing.B) {
ctx := context.Background() ctx := context.Background()
db := db.OpenDb(nil, nil) db := db.OpenDb(nil, nil)
defer db.Close() defer db.Close()
p, pd := createProcessorForTest(b, NumObservations, ctx, db, false) p, pd := createProcessorForTest(b, NumObservations, ctx, db)
require.NotNil(b, p) require.NotNil(b, p)
require.NotNil(b, pd) require.NotNil(b, pd)
@ -102,7 +102,7 @@ func BenchmarkProfileHandleObservation(b *testing.B) {
ctx := context.Background() ctx := context.Background()
db := db.OpenDb(nil, nil) db := db.OpenDb(nil, nil)
defer db.Close() defer db.Close()
p, pd := createProcessorForTest(b, NumObservations, ctx, db, false) p, pd := createProcessorForTest(b, NumObservations, ctx, db)
require.NotNil(b, p) require.NotNil(b, p)
require.NotNil(b, pd) require.NotNil(b, pd)
@ -131,7 +131,7 @@ func (pd *ProcessorData) messageID(seqNum uint64) string {
} }
// createProcessorForTest creates a processor for benchmarking. It assumes we are index zero in the guardian set. // createProcessorForTest creates a processor for benchmarking. It assumes we are index zero in the guardian set.
func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *db.Database, useBatching bool) (*Processor, *ProcessorData) { func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *db.Database) (*Processor, *ProcessorData) {
b.Helper() b.Helper()
logger := zap.NewNop() logger := zap.NewNop()
@ -185,8 +185,6 @@ func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *
gatewayRelayer: gwRelayer, gatewayRelayer: gwRelayer,
} }
batchCutoverCompleteFlag.Store(useBatching)
go func() { _ = p.vaaWriter(ctx) }() go func() { _ = p.vaaWriter(ctx) }()
go func() { _ = p.batchProcessor(ctx) }() go func() { _ = p.batchProcessor(ctx) }()

View File

@ -47,36 +47,14 @@ func (p *Processor) broadcastSignature(
MessageId: messageID, MessageId: messageID,
} }
if batchCutoverComplete() { if shouldPublishImmediately {
if shouldPublishImmediately { msg = p.publishImmediately(ourObs)
msg = p.publishImmediately(ourObs)
observationsBroadcast.Inc()
} else {
p.postObservationToBatch(ourObs)
batchObservationsBroadcast.Inc()
}
} else {
// Post the observation in its own gossip message.
obsv := gossipv1.SignedObservation{
Addr: p.ourAddr.Bytes(),
Hash: digest.Bytes(),
Signature: signature,
TxHash: txhash,
MessageId: messageID,
}
w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}}
var err error
msg, err = proto.Marshal(&w)
if err != nil {
panic(err)
}
// Broadcast the observation.
p.gossipAttestationSendC <- msg
observationsBroadcast.Inc() observationsBroadcast.Inc()
} else {
p.postObservationToBatch(ourObs)
batchObservationsBroadcast.Inc()
} }
return ourObs, msg return ourObs, msg
} }

View File

@ -1,87 +0,0 @@
package processor
import (
"fmt"
"strings"
"sync/atomic"
"time"
"go.uber.org/zap"
)
// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr!
const mainnetCutOverTimeStr = "2024-10-29T09:00:00-0500"
const testnetCutOverTimeStr = "2024-09-24T09:00:00-0500"
const devnetCutOverTimeStr = "2024-08-01T00:00:00-0500"
const cutOverFmtStr = "2006-01-02T15:04:05-0700"
// batchCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish observation batches.
var batchCutoverCompleteFlag atomic.Bool
// batchCutoverComplete returns true if the cutover time has passed, meaning we should publish observation batches.
func batchCutoverComplete() bool {
return batchCutoverCompleteFlag.Load()
}
// evaluateCutOver determines if the cutover time has passed yet and sets the global flag accordingly. If the time has
// not yet passed, it creates a go routine to wait for that time and then sets the flag.
func evaluateBatchCutover(logger *zap.Logger, networkID string) error {
cutOverTimeStr := getCutOverTimeStr(networkID)
sco, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, time.Now())
if err != nil {
return err
}
batchCutoverCompleteFlag.Store(sco)
logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", batchCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "batchco"))
if delay != time.Duration(0) {
// Wait for the cut over time and then update the flag.
go func() {
time.Sleep(delay)
logger.Info("time to cut over to batch publishing", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "batchco"))
batchCutoverCompleteFlag.Store(true)
}()
}
return nil
}
// evaluateBatchCutoverImpl performs the actual cut over check. It is a separate function for testing purposes.
func evaluateBatchCutoverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) {
if cutOverTimeStr == "" {
return false, 0, nil
}
cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr)
if err != nil {
return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err)
}
if cutOverTime.Before(now) {
logger.Info("cut over time has passed, should publish observation batches", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "batchco"))
return true, 0, nil
}
// If we get here, we need to wait for the cutover and then switch the global flag.
delay := cutOverTime.Sub(now)
logger.Info("still waiting for cut over time",
zap.Stringer("cutOverTime", cutOverTime),
zap.String("now", now.Format(cutOverFmtStr)),
zap.Stringer("delay", delay),
zap.String("component", "batchco"))
return false, delay, nil
}
// getCutOverTimeStr returns the cut over time string based on the network ID passed in.
func getCutOverTimeStr(networkID string) string { //nolint:unparam
if strings.Contains(networkID, "/mainnet/") {
return mainnetCutOverTimeStr
}
if strings.Contains(networkID, "/testnet/") {
return testnetCutOverTimeStr
}
return devnetCutOverTimeStr
}

View File

@ -1,81 +0,0 @@
package processor
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestVerifyCutOverTime(t *testing.T) {
if mainnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr)
require.NoError(t, err)
}
if testnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr)
require.NoError(t, err)
}
if devnetCutOverTimeStr != "" {
_, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr)
require.NoError(t, err)
}
}
func TestGetCutOverTimeStr(t *testing.T) {
assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah"))
assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah"))
assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah"))
}
func TestCutOverDisabled(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := ""
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)
cuttingOver, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}
func TestCutOverInvalidTime(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := "Hello World"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)
_, _, err = evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`)
}
func TestCutOverAlreadyHappened(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000")
require.NoError(t, err)
cuttingOver, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.True(t, cuttingOver)
assert.Equal(t, time.Duration(0), delay)
}
func TestCutOverDelayRequired(t *testing.T) {
logger := zap.NewNop()
cutOverTimeStr := "2023-10-06T18:18:00-0000"
now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000")
require.NoError(t, err)
cuttingOver, delay, err := evaluateBatchCutoverImpl(logger, cutOverTimeStr, now)
require.NoError(t, err)
assert.False(t, cuttingOver)
assert.Equal(t, time.Duration(60*time.Minute), delay)
}

View File

@ -266,13 +266,6 @@ func NewProcessor(
} }
func (p *Processor) Run(ctx context.Context) error { func (p *Processor) Run(ctx context.Context) error {
// Evaluate the batch cutover time. If it has passed, then the flag will be set to make us publish observation batches.
// If not, a routine will be started to wait for that time before starting to publish batches.
cutoverErr := evaluateBatchCutover(p.logger, p.networkID)
if cutoverErr != nil {
panic(cutoverErr)
}
if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil { if err := supervisor.Run(ctx, "vaaWriter", common.WrapWithScissors(p.vaaWriter, "vaaWriter")); err != nil {
return fmt.Errorf("failed to start vaa writer: %w", err) return fmt.Errorf("failed to start vaa writer: %w", err)
} }
@ -474,8 +467,5 @@ func (p *Processor) vaaWriter(ctx context.Context) error {
// GetFeatures returns the processor feature string that can be published in heartbeat messages. // GetFeatures returns the processor feature string that can be published in heartbeat messages.
func GetFeatures() string { func GetFeatures() string {
if batchCutoverComplete() {
return "processor:batching"
}
return "" return ""
} }