lightwalletd/cmd/ingest/main.go

242 lines
5.5 KiB
Go
Raw Normal View History

package main
import (
"database/sql"
"encoding/binary"
"encoding/hex"
"flag"
"fmt"
"os"
"github.com/golang/protobuf/proto"
_ "github.com/mattn/go-sqlite3"
zmq "github.com/pebbe/zmq4"
"github.com/sirupsen/logrus"
"github.com/zcash-hackworks/lightwalletd/parser"
"github.com/zcash-hackworks/lightwalletd/storage"
)
var log *logrus.Entry
var logger = logrus.New()
var db *sql.DB
type Options struct {
zmqAddr string
zmqTopic string
dbPath string
logLevel uint64
logPath string
}
func main() {
opts := &Options{}
flag.StringVar(&opts.zmqAddr, "zmq-addr", "127.0.0.1:28332", "the address of the 0MQ publisher")
flag.StringVar(&opts.zmqTopic, "zmq-topic", "checkedblock", "the stream to listen to")
flag.StringVar(&opts.dbPath, "db-path", "", "the path to a sqlite database file")
flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)")
flag.StringVar(&opts.logPath, "log-file", "", "log file to write to")
// TODO prod metrics
// TODO support config from file and env vars
flag.Parse()
if opts.dbPath == "" {
flag.Usage()
os.Exit(1)
}
// Initialize logging
logger.SetFormatter(&logrus.TextFormatter{
//DisableColors: true,
FullTimestamp: true,
DisableLevelTruncation: true,
})
if opts.logPath != "" {
// instead write parsable logs for logstash/splunk/etc
output, err := os.OpenFile(opts.logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
"path": opts.logPath,
}).Fatal("couldn't open log file")
}
defer output.Close()
logger.SetOutput(output)
logger.SetFormatter(&logrus.JSONFormatter{})
}
logger.SetLevel(logrus.Level(opts.logLevel))
log = logger.WithFields(logrus.Fields{
"app": "zmqclient",
})
// Initialize database
db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_busy_timeout=10000&cache=shared", opts.dbPath))
db.SetMaxOpenConns(1)
if err != nil {
log.WithFields(logrus.Fields{
"db_path": opts.dbPath,
"error": err,
}).Fatal("couldn't open SQL db")
}
// Creates our tables if they don't already exist.
err = storage.CreateTables(db)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Fatal("couldn't create SQL tables")
}
// Initialize ZMQ
ctx, err := zmq.NewContext()
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Fatal("couldn't create zmq context")
}
defer ctx.Term()
// WARNING: The Socket is not thread safe. This means that you cannot
// access the same Socket from different goroutines without using something
// like a mutex.
sock, err := ctx.NewSocket(zmq.SUB)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Fatal("couldn't create zmq context socket")
}
err = sock.SetSubscribe(opts.zmqTopic)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
"stream": opts.zmqTopic,
}).Fatal("couldn't subscribe to stream")
}
connString := fmt.Sprintf("tcp://%s", opts.zmqAddr)
err = sock.Connect(connString)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
"connection": connString,
}).Fatal("couldn't connect to socket")
}
defer sock.Close()
log.Printf("Listening to 0mq on %s", opts.zmqAddr)
// Start listening for new blocks
for {
msg, err := sock.RecvMessageBytes(0)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Error("error on msg recv")
continue
}
if len(msg) < 3 {
log.WithFields(logrus.Fields{
"msg": msg,
}).Warn("got unknown message type")
continue
}
topic, body := msg[0], msg[1]
var sequence int
if len(msg[2]) == 4 {
sequence = int(binary.LittleEndian.Uint32(msg[len(msg)-1]))
}
switch string(topic) {
case opts.zmqTopic:
// there's an implicit mutex here
go handleBlock(db, sequence, body)
default:
log.WithFields(logrus.Fields{
"seqnum": sequence,
"topic": topic,
}).Warn("got message with unknown topic")
}
}
}
func handleBlock(db *sql.DB, sequence int, blockData []byte) {
block := parser.NewBlock()
rest, err := block.ParseFromSlice(blockData)
if err != nil {
log.WithFields(logrus.Fields{
"seqnum": sequence,
"error": err,
}).Error("error parsing block")
return
}
if len(rest) != 0 {
log.WithFields(logrus.Fields{
"seqnum": sequence,
"length": len(rest),
}).Warn("received overlong message")
return
}
blockHash := hex.EncodeToString(block.GetEncodableHash())
marshaledBlock, _ := proto.Marshal(block.ToCompact())
err = storage.StoreBlock(
db,
block.GetHeight(),
blockHash,
block.HasSaplingTransactions(),
marshaledBlock,
)
entry := log.WithFields(logrus.Fields{
"seqnum": sequence,
"block_height": block.GetHeight(),
"block_hash": hex.EncodeToString(block.GetDisplayHash()),
"block_version": block.GetVersion(),
"tx_count": block.GetTxCount(),
"sapling": block.HasSaplingTransactions(),
"error": err,
})
if err != nil {
entry.Error("new block")
} else {
entry.Info("new block")
}
for index, tx := range block.Transactions() {
txHash := hex.EncodeToString(tx.GetEncodableHash())
err = storage.StoreTransaction(
db,
block.GetHeight(),
blockHash,
index,
txHash,
tx.Bytes(),
)
entry = log.WithFields(logrus.Fields{
"block_height": block.GetHeight(),
"block_hash": hex.EncodeToString(block.GetDisplayHash()),
"tx_index": index,
"tx_size": len(tx.Bytes()),
"sapling": tx.HasSaplingTransactions(),
"error": err,
})
if err != nil {
entry.Error("storing tx")
} else {
entry.Debug("storing tx")
}
}
}