Node/Acct: Logging changes (#2574)

* Node/Acct: Logging changes

* Missed some!

* Switch to using logger.With()

* Still missed some.

* Missed one more

* Yet again

* Missed a few

* Don't need .With in AcctGetData
This commit is contained in:
bruce-riley 2023-04-06 13:32:00 -05:00 committed by GitHub
parent 048595f3da
commit a846036b6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 109 additions and 108 deletions

View File

@ -1011,7 +1011,7 @@ func runNode(cmd *cobra.Command, args []string) {
wormchainKeyPathName = fmt.Sprint(*wormchainKeyPath, idx) wormchainKeyPathName = fmt.Sprint(*wormchainKeyPath, idx)
} }
logger.Debug("acct: loading key file", zap.String("key path", wormchainKeyPathName)) logger.Debug("loading key file", zap.String("key path", wormchainKeyPathName))
wormchainKey, err = wormconn.LoadWormchainPrivKey(wormchainKeyPathName, *wormchainKeyPassPhrase) wormchainKey, err = wormconn.LoadWormchainPrivKey(wormchainKeyPathName, *wormchainKeyPassPhrase)
if err != nil { if err != nil {
logger.Fatal("failed to load wormchain private key", zap.Error(err)) logger.Fatal("failed to load wormchain private key", zap.Error(err))
@ -1029,20 +1029,21 @@ func runNode(cmd *cobra.Command, args []string) {
// will be passed to it for processing. It will forward all token bridge transfers to the accountant contract. // will be passed to it for processing. It will forward all token bridge transfers to the accountant contract.
// If accountantCheckEnabled is set to true, token bridge transfers will not be signed and published until they // If accountantCheckEnabled is set to true, token bridge transfers will not be signed and published until they
// are approved by the accountant smart contract. // are approved by the accountant smart contract.
acctLogger := logger.With(zap.String("component", "gacct"))
acctReadC, acctWriteC := makeChannelPair[*common.MessagePublication](0) acctReadC, acctWriteC := makeChannelPair[*common.MessagePublication](0)
var acct *accountant.Accountant var acct *accountant.Accountant
if *accountantContract != "" { if *accountantContract != "" {
if *accountantWS == "" { if *accountantWS == "" {
logger.Fatal("acct: if accountantContract is specified, accountantWS is required") acctLogger.Fatal("if accountantContract is specified, accountantWS is required")
} }
if wormchainConn == nil { if wormchainConn == nil {
logger.Fatal("acct: if accountantContract is specified, the wormchain sending connection must be enabled") acctLogger.Fatal("if accountantContract is specified, the wormchain sending connection must be enabled")
} }
if *accountantCheckEnabled { if *accountantCheckEnabled {
logger.Info("acct: accountant is enabled and will be enforced") acctLogger.Info("accountant is enabled and will be enforced")
} else { } else {
logger.Info("acct: accountant is enabled but will not be enforced") acctLogger.Info("accountant is enabled but will not be enforced")
} }
env := accountant.MainNetMode env := accountant.MainNetMode
if *testnetMode { if *testnetMode {
@ -1065,7 +1066,7 @@ func runNode(cmd *cobra.Command, args []string) {
env, env,
) )
} else { } else {
logger.Info("acct: accountant is disabled") acctLogger.Info("accountant is disabled")
} }
var gov *governor.ChainGovernor var gov *governor.ChainGovernor
@ -1425,7 +1426,7 @@ func runNode(cmd *cobra.Command, args []string) {
if acct != nil { if acct != nil {
if err := acct.Start(ctx); err != nil { if err := acct.Start(ctx); err != nil {
logger.Fatal("acct: failed to start accountant", zap.Error(err)) acctLogger.Fatal("failed to start accountant", zap.Error(err))
} }
} }

View File

@ -124,18 +124,18 @@ func testSubmit(
msgs := []*common.MessagePublication{&msg} msgs := []*common.MessagePublication{&msg}
txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs)
if err != nil { if err != nil {
logger.Error("acct: failed to broadcast Observation request", zap.String("test", tag), zap.Error(err)) logger.Error("failed to broadcast Observation request", zap.String("test", tag), zap.Error(err))
return false return false
} }
responses, err := accountant.GetObservationResponses(txResp) responses, err := accountant.GetObservationResponses(txResp)
if err != nil { if err != nil {
logger.Error("acct: failed to get responses", zap.Error(err)) logger.Error("failed to get responses", zap.Error(err))
return false return false
} }
if len(responses) != len(msgs) { if len(responses) != len(msgs) {
logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err))
return false return false
} }
@ -203,18 +203,18 @@ func testBatch(
txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs)
if err != nil { if err != nil {
logger.Error("acct: failed to broadcast Observation request", zap.Error(err)) logger.Error("failed to broadcast Observation request", zap.Error(err))
return false return false
} }
responses, err := accountant.GetObservationResponses(txResp) responses, err := accountant.GetObservationResponses(txResp)
if err != nil { if err != nil {
logger.Error("acct: failed to get responses", zap.Error(err)) logger.Error("failed to get responses", zap.Error(err))
return false return false
} }
if len(responses) != len(msgs) { if len(responses) != len(msgs) {
logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err))
return false return false
} }
@ -222,12 +222,12 @@ func testBatch(
msgId := msg.MessageIDString() msgId := msg.MessageIDString()
status, exists := responses[msgId] status, exists := responses[msgId]
if !exists { if !exists {
logger.Error("acct: did not receive an observation response for message", zap.Int("idx", idx), zap.String("msgId", msgId)) logger.Error("did not receive an observation response for message", zap.Int("idx", idx), zap.String("msgId", msgId))
return false return false
} }
if status.Type != "committed" { if status.Type != "committed" {
logger.Error("acct: unexpected response on observation", zap.Int("idx", idx), zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) logger.Error("unexpected response on observation", zap.Int("idx", idx), zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data))
return false return false
} }
} }
@ -268,7 +268,7 @@ func testBatchWithcommitted(
_, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) _, err := submit(ctx, logger, gk, wormchainConn, contract, msgs)
if err != nil { if err != nil {
logger.Error("acct: failed to submit initial observation that should work", zap.Error(err)) logger.Error("failed to submit initial observation that should work", zap.Error(err))
return false return false
} }
@ -295,18 +295,18 @@ func testBatchWithcommitted(
txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs)
if err != nil { if err != nil {
logger.Error("acct: failed to broadcast Observation request", zap.Error(err)) logger.Error("failed to broadcast Observation request", zap.Error(err))
return false return false
} }
responses, err := accountant.GetObservationResponses(txResp) responses, err := accountant.GetObservationResponses(txResp)
if err != nil { if err != nil {
logger.Error("acct: failed to get responses", zap.Error(err)) logger.Error("failed to get responses", zap.Error(err))
return false return false
} }
if len(responses) != len(msgs) { if len(responses) != len(msgs) {
logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err))
return false return false
} }
@ -314,12 +314,12 @@ func testBatchWithcommitted(
msgId := msg.MessageIDString() msgId := msg.MessageIDString()
status, exists := responses[msgId] status, exists := responses[msgId]
if !exists { if !exists {
logger.Error("acct: did not receive an observation response for message", zap.Int("idx", idx), zap.String("msgId", msgId)) logger.Error("did not receive an observation response for message", zap.Int("idx", idx), zap.String("msgId", msgId))
return false return false
} }
if status.Type != "committed" { if status.Type != "committed" {
logger.Error("acct: unexpected response on observation", zap.Int("idx", idx), zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) logger.Error("unexpected response on observation", zap.Int("idx", idx), zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data))
return false return false
} }
} }
@ -360,7 +360,7 @@ func testBatchWithDigestError(
_, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) _, err := submit(ctx, logger, gk, wormchainConn, contract, msgs)
if err != nil { if err != nil {
logger.Error("acct: failed to submit initial observation that should work", zap.Error(err)) logger.Error("failed to submit initial observation that should work", zap.Error(err))
return false return false
} }
@ -388,47 +388,47 @@ func testBatchWithDigestError(
txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs) txResp, err := submit(ctx, logger, gk, wormchainConn, contract, msgs)
if err != nil { if err != nil {
logger.Error("acct: failed to submit second observation that should work", zap.Error(err)) logger.Error("failed to submit second observation that should work", zap.Error(err))
return false return false
} }
responses, err := accountant.GetObservationResponses(txResp) responses, err := accountant.GetObservationResponses(txResp)
if err != nil { if err != nil {
logger.Error("acct: failed to get responses", zap.Error(err)) logger.Error("failed to get responses", zap.Error(err))
return false return false
} }
if len(responses) != len(msgs) { if len(responses) != len(msgs) {
logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err))
return false return false
} }
msgId := msgs[0].MessageIDString() msgId := msgs[0].MessageIDString()
status, exists := responses[msgId] status, exists := responses[msgId]
if !exists { if !exists {
logger.Error("acct: did not receive an observation response for message 0", zap.String("msgId", msgId)) logger.Error("did not receive an observation response for message 0", zap.String("msgId", msgId))
return false return false
} }
if status.Type != "committed" { if status.Type != "committed" {
logger.Error("acct: unexpected response on observation for message 0", zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) logger.Error("unexpected response on observation for message 0", zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data))
return false return false
} }
msgId = msgs[1].MessageIDString() msgId = msgs[1].MessageIDString()
status, exists = responses[msgId] status, exists = responses[msgId]
if !exists { if !exists {
logger.Error("acct: did not receive an observation response for message 1", zap.String("msgId", msgId)) logger.Error("did not receive an observation response for message 1", zap.String("msgId", msgId))
return false return false
} }
if status.Type != "error" { if status.Type != "error" {
logger.Error("acct: unexpected response on observation for message 1", zap.String("status", status.Type), zap.String("text", status.Data)) logger.Error("unexpected response on observation for message 1", zap.String("status", status.Type), zap.String("text", status.Data))
return false return false
} }
if status.Data != "digest mismatch for processed message" { if status.Data != "digest mismatch for processed message" {
logger.Error("acct: unexpected error text on observation for message 1", zap.String("text", status.Data)) logger.Error("unexpected error text on observation for message 1", zap.String("text", status.Data))
return false return false
} }

View File

@ -107,7 +107,7 @@ func NewAccountant(
) *Accountant { ) *Accountant {
return &Accountant{ return &Accountant{
ctx: ctx, ctx: ctx,
logger: logger, logger: logger.With(zap.String("component", "gacct")),
db: db, db: db,
obsvReqWriteC: obsvReqWriteC, obsvReqWriteC: obsvReqWriteC,
contract: contract, contract: contract,
@ -127,7 +127,7 @@ func NewAccountant(
// Run initializes the accountant and starts the watcher runnable. // Run initializes the accountant and starts the watcher runnable.
func (acct *Accountant) Start(ctx context.Context) error { func (acct *Accountant) Start(ctx context.Context) error {
acct.logger.Debug("acct: entering Start", zap.Bool("enforceFlag", acct.enforceFlag)) acct.logger.Debug("entering Start", zap.Bool("enforceFlag", acct.enforceFlag))
acct.pendingTransfersLock.Lock() acct.pendingTransfersLock.Lock()
defer acct.pendingTransfersLock.Unlock() defer acct.pendingTransfersLock.Unlock()
@ -153,7 +153,7 @@ func (acct *Accountant) Start(ctx context.Context) error {
tbe := &tokenBridgeEntry{} tbe := &tokenBridgeEntry{}
acct.tokenBridges[tbk] = tbe acct.tokenBridges[tbk] = tbe
acct.logger.Info("acct: will monitor token bridge:", zap.Stringer("emitterChainId", tbk.emitterChainId), zap.Stringer("emitterAddr", tbk.emitterAddr)) acct.logger.Info("will monitor token bridge:", zap.Stringer("emitterChainId", tbk.emitterChainId), zap.Stringer("emitterAddr", tbk.emitterAddr))
} }
// Load any existing pending transfers from the db. // Load any existing pending transfers from the db.
@ -201,7 +201,7 @@ func (acct *Accountant) IsMessageCoveredByAccountant(msg *common.MessagePublicat
tbk := tokenBridgeKey{emitterChainId: msg.EmitterChain, emitterAddr: msg.EmitterAddress} tbk := tokenBridgeKey{emitterChainId: msg.EmitterChain, emitterAddr: msg.EmitterAddress}
if _, exists := acct.tokenBridges[tbk]; !exists { if _, exists := acct.tokenBridges[tbk]; !exists {
if msg.EmitterChain != vaa.ChainIDPythNet { if msg.EmitterChain != vaa.ChainIDPythNet {
acct.logger.Debug("acct: ignoring vaa because it is not a token bridge", zap.String("msgID", msgId)) acct.logger.Debug("ignoring vaa because it is not a token bridge", zap.String("msgID", msgId))
} }
return false return false
@ -209,7 +209,7 @@ func (acct *Accountant) IsMessageCoveredByAccountant(msg *common.MessagePublicat
// We only care about transfers. // We only care about transfers.
if !vaa.IsTransfer(msg.Payload) { if !vaa.IsTransfer(msg.Payload) {
acct.logger.Info("acct: ignoring vaa because it is not a transfer", zap.String("msgID", msgId)) acct.logger.Info("ignoring vaa because it is not a transfer", zap.String("msgID", msgId))
return false return false
} }
@ -221,7 +221,7 @@ func (acct *Accountant) IsMessageCoveredByAccountant(msg *common.MessagePublicat
// false if not (because it has been submitted to accountant). // false if not (because it has been submitted to accountant).
func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool, error) { func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool, error) {
msgId := msg.MessageIDString() msgId := msg.MessageIDString()
acct.logger.Debug("acct: in SubmitObservation", zap.String("msgID", msgId)) acct.logger.Debug("in SubmitObservation", zap.String("msgID", msgId))
if !acct.IsMessageCoveredByAccountant(msg) { if !acct.IsMessageCoveredByAccountant(msg) {
return true, nil return true, nil
@ -236,14 +236,14 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
if oldEntry, exists := acct.pendingTransfers[msgId]; exists { if oldEntry, exists := acct.pendingTransfers[msgId]; exists {
if oldEntry.digest != digest { if oldEntry.digest != digest {
digestMismatches.Inc() digestMismatches.Inc()
acct.logger.Error("acct: digest in pending transfer has changed, dropping it", acct.logger.Error("digest in pending transfer has changed, dropping it",
zap.String("msgID", msgId), zap.String("msgID", msgId),
zap.String("oldDigest", oldEntry.digest), zap.String("oldDigest", oldEntry.digest),
zap.String("newDigest", digest), zap.String("newDigest", digest),
zap.Bool("enforcing", acct.enforceFlag), zap.Bool("enforcing", acct.enforceFlag),
) )
} else { } else {
acct.logger.Info("acct: blocking transfer because it is already outstanding", zap.String("msgID", msgId), zap.Bool("enforcing", acct.enforceFlag)) acct.logger.Info("blocking transfer because it is already outstanding", zap.String("msgID", msgId), zap.Bool("enforcing", acct.enforceFlag))
} }
return !acct.enforceFlag, nil return !acct.enforceFlag, nil
} }
@ -251,13 +251,13 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
// Add it to the pending map and the database. // Add it to the pending map and the database.
pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest} pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest}
if err := acct.addPendingTransferAlreadyLocked(pe); err != nil { if err := acct.addPendingTransferAlreadyLocked(pe); err != nil {
acct.logger.Error("acct: failed to persist pending transfer, blocking publishing", zap.String("msgID", msgId), zap.Error(err)) acct.logger.Error("failed to persist pending transfer, blocking publishing", zap.String("msgID", msgId), zap.Error(err))
return false, err return false, err
} }
// This transaction may take a while. Pass it off to the worker so we don't block the processor. // This transaction may take a while. Pass it off to the worker so we don't block the processor.
if acct.env != GoTestMode { if acct.env != GoTestMode {
acct.logger.Info("acct: submitting transfer to accountant for approval", zap.String("msgID", msgId), zap.Bool("canPublish", !acct.enforceFlag)) acct.logger.Info("submitting transfer to accountant for approval", zap.String("msgID", msgId), zap.Bool("canPublish", !acct.enforceFlag))
_ = acct.submitObservation(pe) _ = acct.submitObservation(pe)
} }
@ -268,7 +268,7 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
// publishTransferAlreadyLocked publishes a pending transfer to the accountant channel and updates the timestamp. It assumes the caller holds the lock. // publishTransferAlreadyLocked publishes a pending transfer to the accountant channel and updates the timestamp. It assumes the caller holds the lock.
func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) { func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) {
if acct.enforceFlag { if acct.enforceFlag {
acct.logger.Debug("acct: publishTransferAlreadyLocked: notifying the processor", zap.String("msgId", pe.msgId)) acct.logger.Debug("publishTransferAlreadyLocked: notifying the processor", zap.String("msgId", pe.msgId))
acct.msgChan <- pe.msg acct.msgChan <- pe.msg
} }
@ -277,7 +277,7 @@ func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) {
// addPendingTransferAlreadyLocked adds a pending transfer to both the map and the database. It assumes the caller holds the lock. // addPendingTransferAlreadyLocked adds a pending transfer to both the map and the database. It assumes the caller holds the lock.
func (acct *Accountant) addPendingTransferAlreadyLocked(pe *pendingEntry) error { func (acct *Accountant) addPendingTransferAlreadyLocked(pe *pendingEntry) error {
acct.logger.Debug("acct: addPendingTransferAlreadyLocked", zap.String("msgId", pe.msgId)) acct.logger.Debug("addPendingTransferAlreadyLocked", zap.String("msgId", pe.msgId))
pe.setUpdTime() pe.setUpdTime()
if err := acct.db.AcctStorePendingTransfer(pe.msg); err != nil { if err := acct.db.AcctStorePendingTransfer(pe.msg); err != nil {
return err return err
@ -297,13 +297,13 @@ func (acct *Accountant) deletePendingTransfer(msgId string) {
// deletePendingTransferAlreadyLocked deletes the transfer from both the map and the database. It assumes the caller holds the lock. // deletePendingTransferAlreadyLocked deletes the transfer from both the map and the database. It assumes the caller holds the lock.
func (acct *Accountant) deletePendingTransferAlreadyLocked(msgId string) { func (acct *Accountant) deletePendingTransferAlreadyLocked(msgId string) {
acct.logger.Debug("acct: deletePendingTransfer", zap.String("msgId", msgId)) acct.logger.Debug("deletePendingTransfer", zap.String("msgId", msgId))
if _, exists := acct.pendingTransfers[msgId]; exists { if _, exists := acct.pendingTransfers[msgId]; exists {
delete(acct.pendingTransfers, msgId) delete(acct.pendingTransfers, msgId)
transfersOutstanding.Set(float64(len(acct.pendingTransfers))) transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
} }
if err := acct.db.AcctDeletePendingTransfer(msgId); err != nil { if err := acct.db.AcctDeletePendingTransfer(msgId); err != nil {
acct.logger.Error("acct: failed to delete pending transfer from the db", zap.String("msgId", msgId), zap.Error(err)) acct.logger.Error("failed to delete pending transfer from the db", zap.String("msgId", msgId), zap.Error(err))
// Ignore this error and keep going. // Ignore this error and keep going.
} }
} }
@ -317,7 +317,7 @@ func (acct *Accountant) loadPendingTransfers() error {
for _, msg := range pendingTransfers { for _, msg := range pendingTransfers {
msgId := msg.MessageIDString() msgId := msg.MessageIDString()
acct.logger.Info("acct: reloaded pending transfer", zap.String("msgID", msgId)) acct.logger.Info("reloaded pending transfer", zap.String("msgID", msgId))
digest := msg.CreateDigest() digest := msg.CreateDigest()
pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest} pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest}
@ -327,9 +327,9 @@ func (acct *Accountant) loadPendingTransfers() error {
transfersOutstanding.Set(float64(len(acct.pendingTransfers))) transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
if len(acct.pendingTransfers) != 0 { if len(acct.pendingTransfers) != 0 {
acct.logger.Info("acct: reloaded pending transfers", zap.Int("total", len(acct.pendingTransfers))) acct.logger.Info("reloaded pending transfers", zap.Int("total", len(acct.pendingTransfers)))
} else { } else {
acct.logger.Info("acct: no pending transfers to be reloaded") acct.logger.Info("no pending transfers to be reloaded")
} }
return nil return nil
@ -352,9 +352,9 @@ func (acct *Accountant) submitObservation(pe *pendingEntry) bool {
select { select {
case acct.subChan <- pe.msg: case acct.subChan <- pe.msg:
acct.logger.Debug("acct: submitted observation to channel", zap.String("msgId", pe.msgId)) acct.logger.Debug("submitted observation to channel", zap.String("msgId", pe.msgId))
default: default:
acct.logger.Error("acct: unable to submit observation because the channel is full, will try next interval", zap.String("msgId", pe.msgId)) acct.logger.Error("unable to submit observation because the channel is full, will try next interval", zap.String("msgId", pe.msgId))
pe.state.submitPending = false pe.state.submitPending = false
} }

View File

@ -129,9 +129,9 @@ func (acct *Accountant) audit(ctx context.Context) error {
// runAudit is the entry point for the audit of the pending transfer map. It creates a temporary map of all pending transfers and invokes the main audit function. // runAudit is the entry point for the audit of the pending transfer map. It creates a temporary map of all pending transfers and invokes the main audit function.
func (acct *Accountant) runAudit() { func (acct *Accountant) runAudit() {
tmpMap := acct.createAuditMap() tmpMap := acct.createAuditMap()
acct.logger.Debug("acctaudit: in AuditPendingTransfers: starting audit", zap.Int("numPending", len(tmpMap))) acct.logger.Debug("in AuditPendingTransfers: starting audit", zap.Int("numPending", len(tmpMap)))
acct.performAudit(tmpMap) acct.performAudit(tmpMap)
acct.logger.Debug("acctaudit: leaving AuditPendingTransfers") acct.logger.Debug("leaving AuditPendingTransfers")
} }
// createAuditMap creates a temporary map of all pending transfers. It grabs the pending transfer lock. // createAuditMap creates a temporary map of all pending transfers. It grabs the pending transfer lock.
@ -143,10 +143,10 @@ func (acct *Accountant) createAuditMap() map[string]*pendingEntry {
for _, pe := range acct.pendingTransfers { for _, pe := range acct.pendingTransfers {
if pe.hasBeenPendingForTooLong() { if pe.hasBeenPendingForTooLong() {
auditErrors.Inc() auditErrors.Inc()
acct.logger.Error("acctaudit: transfer has been in the submit pending state for too long", zap.Stringer("lastUpdateTime", pe.updTime())) acct.logger.Error("transfer has been in the submit pending state for too long", zap.Stringer("lastUpdateTime", pe.updTime()))
} }
key := pe.makeAuditKey() key := pe.makeAuditKey()
acct.logger.Debug("acctaudit: will audit pending transfer", zap.String("msgId", pe.msgId), zap.String("moKey", key), zap.Bool("submitPending", pe.submitPending()), zap.Stringer("lastUpdateTime", pe.updTime())) acct.logger.Debug("will audit pending transfer", zap.String("msgId", pe.msgId), zap.String("moKey", key), zap.Bool("submitPending", pe.submitPending()), zap.Stringer("lastUpdateTime", pe.updTime()))
tmpMap[key] = pe tmpMap[key] = pe
} }
@ -163,12 +163,12 @@ func (pe *pendingEntry) hasBeenPendingForTooLong() bool {
// performAudit audits the temporary map against the smart contract. It is meant to be run in a go routine. It takes a temporary map of all pending transfers // performAudit audits the temporary map against the smart contract. It is meant to be run in a go routine. It takes a temporary map of all pending transfers
// and validates that against what is reported by the smart contract. For more details, please see the prologue of this file. // and validates that against what is reported by the smart contract. For more details, please see the prologue of this file.
func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) { func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
acct.logger.Debug("acctaudit: entering performAudit") acct.logger.Debug("entering performAudit")
missingObservations, err := acct.queryMissingObservations() missingObservations, err := acct.queryMissingObservations()
if err != nil { if err != nil {
acct.logger.Error("acctaudit: unable to perform audit, failed to query missing observations", zap.Error(err)) acct.logger.Error("unable to perform audit, failed to query missing observations", zap.Error(err))
for _, pe := range tmpMap { for _, pe := range tmpMap {
acct.logger.Error("acctaudit: unsure of status of pending transfer due to query error", zap.String("msgId", pe.msgId)) acct.logger.Error("unsure of status of pending transfer due to query error", zap.String("msgId", pe.msgId))
} }
return return
} }
@ -179,9 +179,9 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
if exists { if exists {
if acct.submitObservation(pe) { if acct.submitObservation(pe) {
auditErrors.Inc() auditErrors.Inc()
acct.logger.Error("acctaudit: contract reported pending observation as missing, resubmitted it", zap.String("msgID", pe.msgId)) acct.logger.Error("contract reported pending observation as missing, resubmitted it", zap.String("msgID", pe.msgId))
} else { } else {
acct.logger.Info("acctaudit: contract reported pending observation as missing but it is queued up to be submitted, skipping it", zap.String("msgID", pe.msgId)) acct.logger.Info("contract reported pending observation as missing but it is queued up to be submitted, skipping it", zap.String("msgID", pe.msgId))
} }
delete(tmpMap, key) delete(tmpMap, key)
@ -200,9 +200,9 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
transferDetails, err := acct.queryBatchTransferStatus(keys) transferDetails, err := acct.queryBatchTransferStatus(keys)
if err != nil { if err != nil {
acct.logger.Error("acctaudit: unable to finish audit, failed to query for transfer statuses", zap.Error(err)) acct.logger.Error("unable to finish audit, failed to query for transfer statuses", zap.Error(err))
for _, pe := range tmpMap { for _, pe := range tmpMap {
acct.logger.Error("acctaudit: unsure of status of pending transfer due to query error", zap.String("msgId", pe.msgId)) acct.logger.Error("unsure of status of pending transfer due to query error", zap.String("msgId", pe.msgId))
} }
return return
} }
@ -212,9 +212,9 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
if !exists { if !exists {
if acct.submitObservation(pe) { if acct.submitObservation(pe) {
auditErrors.Inc() auditErrors.Inc()
acct.logger.Error("acctaudit: query did not return status for transfer, this should not happen, resubmitted it", zap.String("msgId", pe.msgId)) acct.logger.Error("query did not return status for transfer, this should not happen, resubmitted it", zap.String("msgId", pe.msgId))
} else { } else {
acct.logger.Info("acctaudit: query did not return status for transfer we have not submitted yet, ignoring it", zap.String("msgId", pe.msgId)) acct.logger.Info("query did not return status for transfer we have not submitted yet, ignoring it", zap.String("msgId", pe.msgId))
} }
continue continue
@ -224,48 +224,48 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
// This is the case when the contract does not know about a transfer. Resubmit it. // This is the case when the contract does not know about a transfer. Resubmit it.
if acct.submitObservation(pe) { if acct.submitObservation(pe) {
auditErrors.Inc() auditErrors.Inc()
acct.logger.Error("acctaudit: contract does not know about pending transfer, resubmitted it", zap.String("msgId", pe.msgId)) acct.logger.Error("contract does not know about pending transfer, resubmitted it", zap.String("msgId", pe.msgId))
} }
} else if status.Committed != nil { } else if status.Committed != nil {
digest := hex.EncodeToString(status.Committed.Digest) digest := hex.EncodeToString(status.Committed.Digest)
if pe.digest == digest { if pe.digest == digest {
acct.logger.Error("acctaudit: audit determined that transfer has been committed, publishing it", zap.String("msgId", pe.msgId)) acct.logger.Warn("audit determined that transfer has been committed, publishing it", zap.String("msgId", pe.msgId))
acct.handleCommittedTransfer(pe.msgId) acct.handleCommittedTransfer(pe.msgId)
} else { } else {
digestMismatches.Inc() digestMismatches.Inc()
acct.logger.Error("acctaudit: audit detected a digest mismatch, dropping transfer", zap.String("msgId", pe.msgId), zap.String("ourDigest", pe.digest), zap.String("reportedDigest", digest)) acct.logger.Error("audit detected a digest mismatch, dropping transfer", zap.String("msgId", pe.msgId), zap.String("ourDigest", pe.digest), zap.String("reportedDigest", digest))
acct.deletePendingTransfer(pe.msgId) acct.deletePendingTransfer(pe.msgId)
} }
} else if status.Pending != nil { } else if status.Pending != nil {
acct.logger.Debug("acctaudit: contract says transfer is still pending", zap.String("msgId", pe.msgId)) acct.logger.Debug("contract says transfer is still pending", zap.String("msgId", pe.msgId))
} else { } else {
// This is the case when the contract does not know about a transfer. Resubmit it. // This is the case when the contract does not know about a transfer. Resubmit it.
if acct.submitObservation(pe) { if acct.submitObservation(pe) {
auditErrors.Inc() auditErrors.Inc()
bytes, err := json.Marshal(*status) bytes, err := json.Marshal(*status)
if err != nil { if err != nil {
acct.logger.Error("acctaudit: unknown status returned for pending transfer, resubmitted it", zap.String("msgId", pe.msgId), zap.Error(err)) acct.logger.Error("unknown status returned for pending transfer, resubmitted it", zap.String("msgId", pe.msgId), zap.Error(err))
} else { } else {
acct.logger.Error("acctaudit: unknown status returned for pending transfer, resubmitted it", zap.String("msgId", pe.msgId), zap.String("status", string(bytes))) acct.logger.Error("unknown status returned for pending transfer, resubmitted it", zap.String("msgId", pe.msgId), zap.String("status", string(bytes)))
} }
} }
} }
} }
} }
acct.logger.Debug("acctaudit: exiting performAudit") acct.logger.Debug("exiting performAudit")
} }
// handleMissingObservation submits a local reobservation request. It relies on the reobservation code to throttle requests. // handleMissingObservation submits a local reobservation request. It relies on the reobservation code to throttle requests.
func (acct *Accountant) handleMissingObservation(mo MissingObservation) { func (acct *Accountant) handleMissingObservation(mo MissingObservation) {
acct.logger.Error("acctaudit: contract reported unknown observation as missing, requesting local reobservation", zap.Stringer("moKey", mo)) acct.logger.Warn("contract reported unknown observation as missing, requesting local reobservation", zap.Stringer("moKey", mo))
msg := &gossipv1.ObservationRequest{ChainId: uint32(mo.ChainId), TxHash: mo.TxHash} msg := &gossipv1.ObservationRequest{ChainId: uint32(mo.ChainId), TxHash: mo.TxHash}
select { select {
case acct.obsvReqWriteC <- msg: case acct.obsvReqWriteC <- msg:
acct.logger.Debug("acctaudit: submitted local reobservation", zap.Stringer("moKey", mo)) acct.logger.Debug("submitted local reobservation", zap.Stringer("moKey", mo))
default: default:
acct.logger.Error("acctaudit: unable to submit local reobservation because the channel is full, will try next interval", zap.Stringer("moKey", mo)) acct.logger.Error("unable to submit local reobservation because the channel is full, will try next interval", zap.Stringer("moKey", mo))
} }
} }
@ -282,7 +282,7 @@ func (acct *Accountant) queryMissingObservations() ([]MissingObservation, error)
} }
query := fmt.Sprintf(`{"missing_observations":{"guardian_set": %d, "index": %d}}`, gs.Index, guardianIndex) query := fmt.Sprintf(`{"missing_observations":{"guardian_set": %d, "index": %d}}`, gs.Index, guardianIndex)
acct.logger.Debug("acctaudit: submitting missing_observations query", zap.String("query", query)) acct.logger.Debug("submitting missing_observations query", zap.String("query", query))
respBytes, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query)) respBytes, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
if err != nil { if err != nil {
return nil, fmt.Errorf("missing_observations query failed: %w, %s", err, query) return nil, fmt.Errorf("missing_observations query failed: %w, %s", err, query)
@ -293,7 +293,7 @@ func (acct *Accountant) queryMissingObservations() ([]MissingObservation, error)
return nil, fmt.Errorf("failed to parse missing_observations response: %w, resp: %s", err, string(respBytes)) return nil, fmt.Errorf("failed to parse missing_observations response: %w, resp: %s", err, string(respBytes))
} }
acct.logger.Debug("acctaudit: missing_observations query response", zap.Int("numEntries", len(ret.Missing)), zap.String("result", string(respBytes))) acct.logger.Debug("missing_observations query response", zap.Int("numEntries", len(ret.Missing)), zap.String("result", string(respBytes)))
return ret.Missing, nil return ret.Missing, nil
} }
@ -357,7 +357,7 @@ func queryBatchTransferStatusForChunk(
} }
query := fmt.Sprintf(`{"batch_transfer_status":%s}`, string(bytes)) query := fmt.Sprintf(`{"batch_transfer_status":%s}`, string(bytes))
logger.Debug("acctaudit: submitting batch_transfer_status query", zap.String("query", query)) logger.Debug("submitting batch_transfer_status query", zap.String("query", query))
respBytes, err := qc.SubmitQuery(ctx, contract, []byte(query)) respBytes, err := qc.SubmitQuery(ctx, contract, []byte(query))
if err != nil { if err != nil {
return nil, fmt.Errorf("batch_transfer_status query failed: %w, %s", err, query) return nil, fmt.Errorf("batch_transfer_status query failed: %w, %s", err, query)
@ -373,6 +373,6 @@ func queryBatchTransferStatusForChunk(
ret[item.Key.String()] = item.Status ret[item.Key.String()] = item.Status
} }
logger.Debug("acctaudit: batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(respBytes))) logger.Debug("batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(respBytes)))
return ret, nil return ret, nil
} }

View File

@ -197,9 +197,9 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
txResp, err := SubmitObservationsToContract(acct.ctx, acct.logger, acct.gk, gsIndex, guardianIndex, acct.wormchainConn, acct.contract, msgs) txResp, err := SubmitObservationsToContract(acct.ctx, acct.logger, acct.gk, gsIndex, guardianIndex, acct.wormchainConn, acct.contract, msgs)
if err != nil { if err != nil {
// This means the whole batch failed. They will all get retried the next audit cycle. // This means the whole batch failed. They will all get retried the next audit cycle.
acct.logger.Error("acct: failed to submit any observations in batch", zap.Int("numMsgs", len(msgs)), zap.Error(err)) acct.logger.Error("failed to submit any observations in batch", zap.Int("numMsgs", len(msgs)), zap.Error(err))
for idx, msg := range msgs { for idx, msg := range msgs {
acct.logger.Error("acct: failed to submit observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString())) acct.logger.Error("failed to submit observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString()))
} }
submitFailures.Add(float64(len(msgs))) submitFailures.Add(float64(len(msgs)))
@ -210,9 +210,9 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
responses, err := GetObservationResponses(txResp) responses, err := GetObservationResponses(txResp)
if err != nil { if err != nil {
// This means the whole batch failed. They will all get retried the next audit cycle. // This means the whole batch failed. They will all get retried the next audit cycle.
acct.logger.Error("acct: failed to get responses from batch", zap.Error(err), zap.String("txResp", acct.wormchainConn.BroadcastTxResponseToString(txResp))) acct.logger.Error("failed to get responses from batch", zap.Error(err), zap.String("txResp", acct.wormchainConn.BroadcastTxResponseToString(txResp)))
for idx, msg := range msgs { for idx, msg := range msgs {
acct.logger.Error("acct: need to retry observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString())) acct.logger.Error("need to retry observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString()))
} }
submitFailures.Add(float64(len(msgs))) submitFailures.Add(float64(len(msgs)))
@ -222,9 +222,9 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
if len(responses) != len(msgs) { if len(responses) != len(msgs) {
// This means the whole batch failed. They will all get retried the next audit cycle. // This means the whole batch failed. They will all get retried the next audit cycle.
acct.logger.Error("acct: number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err)) acct.logger.Error("number of responses does not match number of messages", zap.Int("numMsgs", len(msgs)), zap.Int("numResp", len(responses)), zap.Error(err))
for idx, msg := range msgs { for idx, msg := range msgs {
acct.logger.Error("acct: need to retry observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString())) acct.logger.Error("need to retry observation", zap.Int("idx", idx), zap.String("msgId", msg.MessageIDString()))
} }
submitFailures.Add(float64(len(msgs))) submitFailures.Add(float64(len(msgs)))
@ -238,22 +238,22 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
status, exists := responses[msgId] status, exists := responses[msgId]
if !exists { if !exists {
// This will get retried next audit interval. // This will get retried next audit interval.
acct.logger.Error("acct: did not receive an observation response for message", zap.String("msgId", msgId)) acct.logger.Error("did not receive an observation response for message", zap.String("msgId", msgId))
submitFailures.Inc() submitFailures.Inc()
continue continue
} }
switch status.Type { switch status.Type {
case "pending": case "pending":
acct.logger.Info("acct: transfer is pending", zap.String("msgId", msgId)) acct.logger.Info("transfer is pending", zap.String("msgId", msgId))
case "committed": case "committed":
acct.handleCommittedTransfer(msgId) acct.handleCommittedTransfer(msgId)
case "error": case "error":
submitFailures.Inc() submitFailures.Inc()
acct.handleTransferError(msgId, status.Data, "acct: transfer failed") acct.handleTransferError(msgId, status.Data, "transfer failed")
default: default:
// This will get retried next audit interval. // This will get retried next audit interval.
acct.logger.Error("acct: unexpected status response on observation", zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data)) acct.logger.Error("unexpected status response on observation", zap.String("msgId", msgId), zap.String("status", status.Type), zap.String("text", status.Data))
submitFailures.Inc() submitFailures.Inc()
} }
} }
@ -267,11 +267,11 @@ func (acct *Accountant) handleCommittedTransfer(msgId string) {
defer acct.pendingTransfersLock.Unlock() defer acct.pendingTransfersLock.Unlock()
pe, exists := acct.pendingTransfers[msgId] pe, exists := acct.pendingTransfers[msgId]
if exists { if exists {
acct.logger.Info("acct: transfer has been committed, publishing it", zap.String("msgId", msgId)) acct.logger.Info("transfer has been committed, publishing it", zap.String("msgId", msgId))
acct.publishTransferAlreadyLocked(pe) acct.publishTransferAlreadyLocked(pe)
transfersApproved.Inc() transfersApproved.Inc()
} else { } else {
acct.logger.Debug("acct: transfer has been committed but it is no longer in our map", zap.String("msgId", msgId)) acct.logger.Debug("transfer has been committed but it is no longer in our map", zap.String("msgId", msgId))
} }
} }
@ -279,7 +279,7 @@ func (acct *Accountant) handleCommittedTransfer(msgId string) {
func (acct *Accountant) handleTransferError(msgId string, errText string, logText string) { func (acct *Accountant) handleTransferError(msgId string, errText string, logText string) {
if strings.Contains(errText, "insufficient balance") { if strings.Contains(errText, "insufficient balance") {
balanceErrors.Inc() balanceErrors.Inc()
acct.logger.Error("acct: insufficient balance error detected, dropping transfer", zap.String("msgId", msgId), zap.String("text", errText)) acct.logger.Error("insufficient balance error detected, dropping transfer", zap.String("msgId", msgId), zap.String("text", errText))
acct.deletePendingTransfer(msgId) acct.deletePendingTransfer(msgId)
} else { } else {
// This will get retried next audit interval. // This will get retried next audit interval.
@ -313,7 +313,7 @@ func SubmitObservationsToContract(
Payload: msg.Payload, Payload: msg.Payload,
} }
logger.Debug("acct: in SubmitObservationsToContract, encoding observation", logger.Debug("in SubmitObservationsToContract, encoding observation",
zap.Int("idx", idx), zap.Int("idx", idx),
zap.String("txHash", msg.TxHash.String()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])), zap.String("txHash", msg.TxHash.String()), zap.String("encTxHash", hex.EncodeToString(obs[idx].TxHash[:])),
zap.Stringer("timeStamp", msg.Timestamp), zap.Uint32("encTimestamp", obs[idx].Timestamp), zap.Stringer("timeStamp", msg.Timestamp), zap.Uint32("encTimestamp", obs[idx].Timestamp),
@ -328,17 +328,17 @@ func SubmitObservationsToContract(
bytes, err := json.Marshal(obs) bytes, err := json.Marshal(obs)
if err != nil { if err != nil {
return nil, fmt.Errorf("acct: failed to marshal accountant observation request: %w", err) return nil, fmt.Errorf("failed to marshal accountant observation request: %w", err)
} }
digest, err := vaa.MessageSigningDigest(submitObservationPrefix, bytes) digest, err := vaa.MessageSigningDigest(submitObservationPrefix, bytes)
if err != nil { if err != nil {
return nil, fmt.Errorf("acct: failed to sign accountant Observation request: %w", err) return nil, fmt.Errorf("failed to sign accountant Observation request: %w", err)
} }
sigBytes, err := ethCrypto.Sign(digest.Bytes(), gk) sigBytes, err := ethCrypto.Sign(digest.Bytes(), gk)
if err != nil { if err != nil {
return nil, fmt.Errorf("acct: failed to sign accountant Observation request: %w", err) return nil, fmt.Errorf("failed to sign accountant Observation request: %w", err)
} }
sig := SignatureType{Index: guardianIndex, Signature: sigBytes} sig := SignatureType{Index: guardianIndex, Signature: sigBytes}
@ -353,7 +353,7 @@ func SubmitObservationsToContract(
msgBytes, err := json.Marshal(msgData) msgBytes, err := json.Marshal(msgData)
if err != nil { if err != nil {
return nil, fmt.Errorf("acct: failed to marshal accountant observation request: %w", err) return nil, fmt.Errorf("failed to marshal accountant observation request: %w", err)
} }
subMsg := wasmdtypes.MsgExecuteContract{ subMsg := wasmdtypes.MsgExecuteContract{
@ -363,7 +363,7 @@ func SubmitObservationsToContract(
Funds: sdktypes.Coins{}, Funds: sdktypes.Coins{},
} }
logger.Debug("acct: in SubmitObservationsToContract, sending broadcast", logger.Debug("in SubmitObservationsToContract, sending broadcast",
zap.Int("numObs", len(obs)), zap.Int("numObs", len(obs)),
zap.String("observations", string(bytes)), zap.String("observations", string(bytes)),
zap.Uint32("gsIndex", gsIndex), zap.Uint32("guardianIndex", guardianIndex), zap.Uint32("gsIndex", gsIndex), zap.Uint32("guardianIndex", guardianIndex),
@ -395,8 +395,8 @@ func SubmitObservationsToContract(
return txResp, fmt.Errorf("failed to submit observations: %s", txResp.TxResponse.RawLog) return txResp, fmt.Errorf("failed to submit observations: %s", txResp.TxResponse.RawLog)
} }
logger.Info("acct: done sending broadcast", zap.Int("numObs", len(obs)), zap.Int64("gasUsed", txResp.TxResponse.GasUsed), zap.Stringer("elapsedTime", time.Since(start))) logger.Info("done sending broadcast", zap.Int("numObs", len(obs)), zap.Int64("gasUsed", txResp.TxResponse.GasUsed), zap.Stringer("elapsedTime", time.Since(start)))
logger.Debug("acct: in SubmitObservationsToContract, done sending broadcast", zap.String("resp", wormchainConn.BroadcastTxResponseToString(txResp))) logger.Debug("in SubmitObservationsToContract, done sending broadcast", zap.String("resp", wormchainConn.BroadcastTxResponseToString(txResp)))
return txResp, nil return txResp, nil
} }

View File

@ -80,7 +80,7 @@ func (acct *Accountant) handleEvents(ctx context.Context, evts <-chan tmCoreType
case e := <-evts: case e := <-evts:
tx, ok := e.Data.(tmTypes.EventDataTx) tx, ok := e.Data.(tmTypes.EventDataTx)
if !ok { if !ok {
acct.logger.Error("acctwatcher: unknown data from event subscription", zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", e)) acct.logger.Error("unknown data from event subscription", zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", e))
continue continue
} }
@ -88,7 +88,7 @@ func (acct *Accountant) handleEvents(ctx context.Context, evts <-chan tmCoreType
if event.Type == "wasm-Observation" { if event.Type == "wasm-Observation" {
evt, err := parseEvent[WasmObservation](acct.logger, event, "wasm-Observation", acct.contract) evt, err := parseEvent[WasmObservation](acct.logger, event, "wasm-Observation", acct.contract)
if err != nil { if err != nil {
acct.logger.Error("acctwatcher: failed to parse wasm transfer event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event)) acct.logger.Error("failed to parse wasm transfer event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event))
continue continue
} }
@ -97,14 +97,14 @@ func (acct *Accountant) handleEvents(ctx context.Context, evts <-chan tmCoreType
} else if event.Type == "wasm-ObservationError" { } else if event.Type == "wasm-ObservationError" {
evt, err := parseEvent[WasmObservationError](acct.logger, event, "wasm-ObservationError", acct.contract) evt, err := parseEvent[WasmObservationError](acct.logger, event, "wasm-ObservationError", acct.contract)
if err != nil { if err != nil {
acct.logger.Error("acctwatcher: failed to parse wasm observation error event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event)) acct.logger.Error("failed to parse wasm observation error event", zap.Error(err), zap.Stringer("e.Data", reflect.TypeOf(e.Data)), zap.Any("event", event))
continue continue
} }
errorEventsReceived.Inc() errorEventsReceived.Inc()
acct.handleTransferError(evt.Key.String(), evt.Error, "acct: transfer error event received") acct.handleTransferError(evt.Key.String(), evt.Error, "transfer error event received")
} else { } else {
acct.logger.Debug("acctwatcher: ignoring uninteresting event", zap.String("eventType", event.Type)) acct.logger.Debug("ignoring uninteresting event", zap.String("eventType", event.Type))
} }
} }
} }
@ -130,7 +130,7 @@ func parseEvent[T any](logger *zap.Logger, event tmAbci.Event, name string, cont
return nil, fmt.Errorf("%s event from unexpected contract: %s", name, string(attr.Value)) return nil, fmt.Errorf("%s event from unexpected contract: %s", name, string(attr.Value))
} }
} else { } else {
logger.Debug("acctwatcher: event attribute", zap.String("event", name), zap.String("key", string(attr.Key)), zap.String("value", string(attr.Value))) logger.Debug("event attribute", zap.String("event", name), zap.String("key", string(attr.Key)), zap.String("value", string(attr.Value)))
attrs[string(attr.Key)] = attr.Value attrs[string(attr.Key)] = attr.Value
} }
} }

View File

@ -65,7 +65,7 @@ func (d *Database) AcctGetData(logger *zap.Logger) ([]*common.MessagePublication
var pt common.MessagePublication var pt common.MessagePublication
err := json.Unmarshal(val, &pt) err := json.Unmarshal(val, &pt)
if err != nil { if err != nil {
logger.Error("acct: failed to unmarshal pending transfer for key", zap.String("key", string(key[:])), zap.Error(err)) logger.Error("failed to unmarshal pending transfer for key", zap.String("key", string(key[:])), zap.Error(err))
continue continue
} }

View File

@ -196,7 +196,7 @@ func (p *Processor) Run(ctx context.Context) error {
if p.acct != nil { if p.acct != nil {
shouldPub, err := p.acct.SubmitObservation(k) shouldPub, err := p.acct.SubmitObservation(k)
if err != nil { if err != nil {
return fmt.Errorf("acct: failed to process message `%s`: %w", k.MessageIDString(), err) return fmt.Errorf("failed to process message `%s`: %w", k.MessageIDString(), err)
} }
if !shouldPub { if !shouldPub {
continue continue
@ -206,11 +206,11 @@ func (p *Processor) Run(ctx context.Context) error {
case k := <-p.acctReadC: case k := <-p.acctReadC:
if p.acct == nil { if p.acct == nil {
return fmt.Errorf("acct: received an accountant event when accountant is not configured") return fmt.Errorf("received an accountant event when accountant is not configured")
} }
// SECURITY defense-in-depth: Make sure the accountant did not generate an unexpected message. // SECURITY defense-in-depth: Make sure the accountant did not generate an unexpected message.
if !p.acct.IsMessageCoveredByAccountant(k) { if !p.acct.IsMessageCoveredByAccountant(k) {
return fmt.Errorf("acct: accountant published a message that is not covered by it: `%s`", k.MessageIDString()) return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
} }
p.handleMessage(ctx, k) p.handleMessage(ctx, k)
case v := <-p.injectC: case v := <-p.injectC:
@ -238,7 +238,7 @@ func (p *Processor) Run(ctx context.Context) error {
if p.acct != nil { if p.acct != nil {
shouldPub, err := p.acct.SubmitObservation(k) shouldPub, err := p.acct.SubmitObservation(k)
if err != nil { if err != nil {
return fmt.Errorf("acct: failed to process message released by governor `%s`: %w", k.MessageIDString(), err) return fmt.Errorf("failed to process message released by governor `%s`: %w", k.MessageIDString(), err)
} }
if !shouldPub { if !shouldPub {
continue continue