qLogger.Error("chain does not support cross chain queries",zap.String("requestID",requestID),zap.Stringer("chainID",chainID))
errorFound=true
break
}
channel,channelExists:=chainQueryReqC[chainID]
if!channelExists{
qLogger.Error("unknown chain ID for query request, dropping it",zap.String("requestID",requestID),zap.Stringer("chain_id",chainID))
errorFound=true
break
}
queries=append(queries,&perChainQuery{
req:&PerChainQueryInternal{
RequestID:requestID,
RequestIdx:requestIdx,
Request:pcq,
},
channel:channel,
})
}
iferrorFound{
continue
}
// Create the pending query and add it to the cache.
pq:=&pendingQuery{
signedRequest:signedRequest,
request:&queryRequest,
requestID:requestID,
receiveTime:receiveTime,
queries:queries,
responses:responses,
}
pendingQueries[requestID]=pq
// Forward the requests to the watchers.
for_,pcq:=rangepq.queries{
pcq.ccqForwardToWatcher(qLogger,pq.receiveTime)
}
caseresp:=<-queryResponseReadC:// Response from a watcher.
ifresp.Status==QuerySuccess{
ifresp.Response==nil{
qLogger.Error("received a successful query response with no results, dropping it!",zap.String("requestID",resp.RequestID))
continue
}
pq,exists:=pendingQueries[resp.RequestID]
if!exists{
qLogger.Warn("received a success response with no outstanding query, dropping it",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
continue
}
ifresp.RequestIdx>=len(pq.responses){
qLogger.Error("received a response with an invalid index",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
continue
}
// Store the result, which will mark this per-chain query as completed.
pq.responses[resp.RequestIdx]=resp
// If we still have other outstanding per chain queries for this request, keep waiting.
numStillPending:=pq.numPendingRequests()
ifnumStillPending>0{
qLogger.Info("received a per chain query response, still waiting for more",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx),zap.Int("numStillPending",numStillPending))
continue
}else{
qLogger.Info("received final per chain query response, ready to publish",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
}
// Build the list of per chain response publications and the overall query response publication.
responses:=[]*PerChainQueryResponse{}
for_,resp:=rangepq.responses{
ifresp==nil{
qLogger.Error("unexpected null response in pending query!",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
qLogger.Warn("query failed, will retry next interval",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
}else{
qLogger.Warn("received a retry needed response with no outstanding query, dropping it",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
}
}elseifresp.Status==QueryFatalError{
qLogger.Error("received a fatal error response, dropping the whole request",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx))
delete(pendingQueries,resp.RequestID)
}else{
qLogger.Error("received an unexpected query status, dropping the whole request",zap.String("requestID",resp.RequestID),zap.Int("requestIdx",resp.RequestIdx),zap.Int("status",int(resp.Status)))
// TODO: only send the query request itself and reassemble in this module
casepcq.channel<-pcq.req:
qLogger.Debug("forwarded query request to watcher",zap.String("requestID",pcq.req.RequestID),zap.Stringer("chainID",pcq.req.Request.ChainId))
pcq.lastUpdateTime=receiveTime
default:
// By leaving lastUpdateTime unset, we will retry next interval.
qLogger.Warn("failed to send query request to watcher, will retry next interval",zap.String("requestID",pcq.req.RequestID),zap.Stringer("chain_id",pcq.req.Request.ChainId))
}
}
// numPendingRequests returns the number of per chain queries in a request that are still awaiting responses. Zero means the request can now be published.