87 lines
1.8 KiB
Go
87 lines
1.8 KiB
Go
|
package source
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
|
||
|
"go.mongodb.org/mongo-driver/bson"
|
||
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
// Watcher represents a listener of database changes.
|
||
|
type MongoWatcher struct {
|
||
|
db *mongo.Database
|
||
|
dbName string
|
||
|
handler WatcherFunc
|
||
|
logger *zap.Logger
|
||
|
}
|
||
|
|
||
|
// WatcherFunc is a function to send database changes.
|
||
|
type WatcherFunc func(*Event)
|
||
|
|
||
|
type watchEvent struct {
|
||
|
DocumentKey documentKey `bson:"documentKey"`
|
||
|
OperationType string `bson:"operationType"`
|
||
|
DbFullDocument Event `bson:"fullDocument"`
|
||
|
}
|
||
|
type documentKey struct {
|
||
|
ID string `bson:"_id"`
|
||
|
}
|
||
|
|
||
|
// Event represents a database change.
|
||
|
type Event struct {
|
||
|
ID string `bson:"_id"`
|
||
|
Vaas []byte
|
||
|
}
|
||
|
|
||
|
const queryTemplate = `
|
||
|
[
|
||
|
{
|
||
|
"$match" : {
|
||
|
"operationType" : "insert",
|
||
|
"ns": { "$in": [{"db": "%s", "coll": "vaasPythnet"}, {"db": "%s", "coll": "vaas"}] }
|
||
|
}
|
||
|
}
|
||
|
]
|
||
|
`
|
||
|
|
||
|
// NewWatcher creates a new database event watcher.
|
||
|
func NewMongoWatcher(db *mongo.Database, dbName string, handler WatcherFunc, logger *zap.Logger) *MongoWatcher {
|
||
|
return &MongoWatcher{
|
||
|
db: db,
|
||
|
dbName: dbName,
|
||
|
handler: handler,
|
||
|
logger: logger,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Start executes database event consumption.
|
||
|
func (w *MongoWatcher) Start(ctx context.Context) error {
|
||
|
query := fmt.Sprintf(queryTemplate, w.dbName, w.dbName)
|
||
|
var steps []bson.D
|
||
|
err := bson.UnmarshalExtJSON([]byte(query), true, &steps)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
stream, err := w.db.Watch(ctx, steps)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
go func() {
|
||
|
for stream.Next(ctx) {
|
||
|
var e watchEvent
|
||
|
if err := stream.Decode(&e); err != nil {
|
||
|
w.logger.Error("Error unmarshalling event", zap.Error(err))
|
||
|
continue
|
||
|
}
|
||
|
w.handler(&Event{
|
||
|
ID: e.DbFullDocument.ID,
|
||
|
Vaas: e.DbFullDocument.Vaas,
|
||
|
})
|
||
|
}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|