node/watchers/near: linter fixes
This commit is contained in:
parent
c718834021
commit
e979d20ccd
|
@ -30,7 +30,7 @@ func newFinalizer(eventChan chan eventType, nearAPI nearapi.NearApi, mainnet boo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Finalizer) isFinalizedCached(logger *zap.Logger, ctx context.Context, blockHash string) (nearapi.BlockHeader, bool) {
|
func (f Finalizer) isFinalizedCached(logger *zap.Logger, blockHash string) (nearapi.BlockHeader, bool) {
|
||||||
if err := nearapi.IsWellFormedHash(blockHash); err != nil {
|
if err := nearapi.IsWellFormedHash(blockHash); err != nil {
|
||||||
// SECURITY defense-in-depth: check if block hash is well-formed
|
// SECURITY defense-in-depth: check if block hash is well-formed
|
||||||
logger.Error("blockHash invalid", zap.String("error_type", "invalid_hash"), zap.String("blockHash", blockHash), zap.Error(err))
|
logger.Error("blockHash invalid", zap.String("error_type", "invalid_hash"), zap.String("blockHash", blockHash), zap.Error(err))
|
||||||
|
@ -38,7 +38,7 @@ func (f Finalizer) isFinalizedCached(logger *zap.Logger, ctx context.Context, bl
|
||||||
}
|
}
|
||||||
|
|
||||||
if b, ok := f.finalizedBlocksCache.Get(blockHash); ok {
|
if b, ok := f.finalizedBlocksCache.Get(blockHash); ok {
|
||||||
blockHeader := b.(nearapi.BlockHeader)
|
blockHeader := b.(nearapi.BlockHeader) //nolint:forcetypeassert
|
||||||
// SECURITY In blocks < 74473147 message timestamps were computed differently and we don't want to re-observe these messages
|
// SECURITY In blocks < 74473147 message timestamps were computed differently and we don't want to re-observe these messages
|
||||||
if !f.mainnet || blockHeader.Height > 74473147 {
|
if !f.mainnet || blockHeader.Height > 74473147 {
|
||||||
return blockHeader, true
|
return blockHeader, true
|
||||||
|
@ -58,7 +58,7 @@ func (f Finalizer) isFinalized(logger *zap.Logger, ctx context.Context, queriedB
|
||||||
logger.Debug("checking block finalization", zap.String("method", "isFinalized"), zap.String("parameters", queriedBlockHash))
|
logger.Debug("checking block finalization", zap.String("method", "isFinalized"), zap.String("parameters", queriedBlockHash))
|
||||||
|
|
||||||
// check cache first
|
// check cache first
|
||||||
if block, ok := f.isFinalizedCached(logger, ctx, queriedBlockHash); ok {
|
if block, ok := f.isFinalizedCached(logger, queriedBlockHash); ok {
|
||||||
return block, true
|
return block, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,17 +92,17 @@ func (f Finalizer) isFinalized(logger *zap.Logger, ctx context.Context, queriedB
|
||||||
}
|
}
|
||||||
|
|
||||||
if queriedBlockHash == someFinalBlockHash {
|
if queriedBlockHash == someFinalBlockHash {
|
||||||
f.setFinalized(logger, ctx, queriedBlock.Header)
|
f.setFinalized(queriedBlock.Header)
|
||||||
// block was marked as finalized in the cache, so this should succeed now.
|
// block was marked as finalized in the cache, so this should succeed now.
|
||||||
// We don't return directly because setFinalized() contains some sanity checks.
|
// We don't return directly because setFinalized() contains some sanity checks.
|
||||||
return f.isFinalizedCached(logger, ctx, queriedBlockHash)
|
return f.isFinalizedCached(logger, queriedBlockHash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// it seems like the block has not been finalized yet
|
// it seems like the block has not been finalized yet
|
||||||
return nearapi.BlockHeader{}, false
|
return nearapi.BlockHeader{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f Finalizer) setFinalized(logger *zap.Logger, ctx context.Context, blockHeader nearapi.BlockHeader) {
|
func (f Finalizer) setFinalized(blockHeader nearapi.BlockHeader) {
|
||||||
|
|
||||||
// SECURITY defense-in-depth: don't cache obviously corrupted data.
|
// SECURITY defense-in-depth: don't cache obviously corrupted data.
|
||||||
if nearapi.IsWellFormedHash(blockHeader.Hash) != nil || blockHeader.Timestamp == 0 || blockHeader.Height == 0 {
|
if nearapi.IsWellFormedHash(blockHeader.Hash) != nil || blockHeader.Timestamp == 0 || blockHeader.Height == 0 {
|
||||||
|
|
|
@ -78,7 +78,7 @@ func (s *ForwardingCachingServer) ProxyReq(logger *zap.Logger, req *http.Request
|
||||||
req.Body = io.NopCloser(bytes.NewReader(reqBody))
|
req.Body = io.NopCloser(bytes.NewReader(reqBody))
|
||||||
|
|
||||||
url := fmt.Sprintf("%s%s", s.upstreamHost, req.RequestURI)
|
url := fmt.Sprintf("%s%s", s.upstreamHost, req.RequestURI)
|
||||||
proxyReq, _ := http.NewRequest(req.Method, url, bytes.NewReader(reqBody))
|
proxyReq, _ := http.NewRequestWithContext(req.Context(), req.Method, url, bytes.NewReader(reqBody))
|
||||||
|
|
||||||
s.logger.Debug("proxy_req",
|
s.logger.Debug("proxy_req",
|
||||||
zap.String("url", url),
|
zap.String("url", url),
|
||||||
|
|
|
@ -50,7 +50,7 @@ type (
|
||||||
|
|
||||||
func NewHttpNearRpc(nearRPC string) HttpNearRpc {
|
func NewHttpNearRpc(nearRPC string) HttpNearRpc {
|
||||||
// Customize the Transport to have larger connection pool (default is only 2 per host)
|
// Customize the Transport to have larger connection pool (default is only 2 per host)
|
||||||
t := http.DefaultTransport.(*http.Transport).Clone()
|
t := http.DefaultTransport.(*http.Transport).Clone() //nolint:forcetypeassert
|
||||||
t.MaxConnsPerHost = nearRPCConcurrentConnections
|
t.MaxConnsPerHost = nearRPCConcurrentConnections
|
||||||
t.MaxIdleConnsPerHost = nearRPCConcurrentConnections
|
t.MaxIdleConnsPerHost = nearRPCConcurrentConnections
|
||||||
var httpClient = &http.Client{
|
var httpClient = &http.Client{
|
||||||
|
|
|
@ -65,7 +65,7 @@ func NewBlockFromBytes(bytes []byte) (Block, error) {
|
||||||
json := gjson.ParseBytes(bytes)
|
json := gjson.ParseBytes(bytes)
|
||||||
|
|
||||||
ts_nanosec := jsonGetUint(json, "result.header.timestamp")
|
ts_nanosec := jsonGetUint(json, "result.header.timestamp")
|
||||||
ts := uint64(ts_nanosec) / 1_000_000_000
|
ts := ts_nanosec / 1_000_000_000
|
||||||
|
|
||||||
header := BlockHeader{
|
header := BlockHeader{
|
||||||
jsonGetString(json, "result.header.hash"),
|
jsonGetString(json, "result.header.hash"),
|
||||||
|
@ -84,7 +84,7 @@ func NewBlockFromBytes(bytes []byte) (Block, error) {
|
||||||
|
|
||||||
func (b Block) Timestamp() uint64 {
|
func (b Block) Timestamp() uint64 {
|
||||||
ts_nanosec := jsonGetUint(b.json, "result.header.timestamp")
|
ts_nanosec := jsonGetUint(b.json, "result.header.timestamp")
|
||||||
return uint64(ts_nanosec) / 1000000000
|
return ts_nanosec / 1000000000
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b Block) ChunkHashes() []ChunkHeader {
|
func (b Block) ChunkHashes() []ChunkHeader {
|
||||||
|
|
|
@ -12,8 +12,6 @@ import (
|
||||||
func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, chunkHeader nearapi.ChunkHeader) ([]*transactionProcessingJob, error) {
|
func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, chunkHeader nearapi.ChunkHeader) ([]*transactionProcessingJob, error) {
|
||||||
logger.Debug("near.fetchAndParseChunk", zap.String("chunk_hash", chunkHeader.Hash))
|
logger.Debug("near.fetchAndParseChunk", zap.String("chunk_hash", chunkHeader.Hash))
|
||||||
|
|
||||||
var result []*transactionProcessingJob
|
|
||||||
|
|
||||||
chunk, err := e.nearAPI.GetChunk(ctx, chunkHeader)
|
chunk, err := e.nearAPI.GetChunk(ctx, chunkHeader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -21,15 +19,16 @@ func (e *Watcher) fetchAndParseChunk(logger *zap.Logger, ctx context.Context, ch
|
||||||
|
|
||||||
txns := chunk.Transactions()
|
txns := chunk.Transactions()
|
||||||
|
|
||||||
for _, tx := range txns {
|
result := make([]*transactionProcessingJob, len(txns))
|
||||||
result = append(result, newTransactionProcessingJob(tx.Hash, tx.SignerId))
|
for i, tx := range txns {
|
||||||
|
result[i] = newTransactionProcessingJob(tx.Hash, tx.SignerId)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// recursivelyReadFinalizedBlocks walks back the blockchain from the startBlock (inclusive)
|
// recursivelyReadFinalizedBlocks walks back the blockchain from the startBlock (inclusive)
|
||||||
// until it reaches a block of height stopHeight or less (exclusive). Chunks from all these blocks are put
|
// until it reaches a block of height stopHeight or less (exclusive). Chunks from all these blocks are put
|
||||||
// into e.chunkProcessingqueue with the chunks from the oldest block first
|
// into chunkSink with the chunks from the oldest block first
|
||||||
// if there is an error while walking back the chain, no chunks will be returned
|
// if there is an error while walking back the chain, no chunks will be returned
|
||||||
func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context.Context, startBlock nearapi.Block, stopHeight uint64, chunkSink chan<- nearapi.ChunkHeader, recursionDepth uint) error {
|
func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context.Context, startBlock nearapi.Block, stopHeight uint64, chunkSink chan<- nearapi.ChunkHeader, recursionDepth uint) error {
|
||||||
|
|
||||||
|
@ -39,7 +38,7 @@ func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context
|
||||||
}
|
}
|
||||||
|
|
||||||
// SECURITY: We know that this block is finalized because it is a parent of a finalized block.
|
// SECURITY: We know that this block is finalized because it is a parent of a finalized block.
|
||||||
e.finalizer.setFinalized(logger, ctx, startBlock.Header)
|
e.finalizer.setFinalized(startBlock.Header)
|
||||||
|
|
||||||
// we want to avoid going too far back because that would increase the likelihood of error somewhere in the recursion stack.
|
// we want to avoid going too far back because that would increase the likelihood of error somewhere in the recursion stack.
|
||||||
// If we go back too far, we just report the error and terminate early.
|
// If we go back too far, we just report the error and terminate early.
|
||||||
|
@ -70,7 +69,7 @@ func (e *Watcher) recursivelyReadFinalizedBlocks(logger *zap.Logger, ctx context
|
||||||
chunks := startBlock.ChunkHashes()
|
chunks := startBlock.ChunkHashes()
|
||||||
// process chunks after recursion such that youngest chunks get processed first
|
// process chunks after recursion such that youngest chunks get processed first
|
||||||
for i := 0; i < len(chunks); i++ {
|
for i := 0; i < len(chunks); i++ {
|
||||||
e.chunkProcessingQueue <- chunks[i]
|
chunkSink <- chunks[i]
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,7 +115,7 @@ func (e *Watcher) processOutcome(logger *zap.Logger, ctx context.Context, job *t
|
||||||
return nil // SUCCESS
|
return nil // SUCCESS
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Watcher) processWormholeLog(logger *zap.Logger, ctx context.Context, job *transactionProcessingJob, outcomeBlockHeader nearapi.BlockHeader, successValue string, log gjson.Result) error {
|
func (e *Watcher) processWormholeLog(logger *zap.Logger, _ context.Context, job *transactionProcessingJob, outcomeBlockHeader nearapi.BlockHeader, successValue string, log gjson.Result) error {
|
||||||
event := log.String()
|
event := log.String()
|
||||||
|
|
||||||
// SECURITY CRITICAL: Ensure that we're reading a correct log message.
|
// SECURITY CRITICAL: Ensure that we're reading a correct log message.
|
||||||
|
|
Loading…
Reference in New Issue